Line data Source code
1 : //# Partition.cc
2 : //# Copyright (C) 1996-2007
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify
6 : //# it under the terms of the GNU General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or
8 : //# (at your option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful,
11 : //# but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : //# GNU General Public License for more details.
14 : //#
15 : //# You should have received a copy of the GNU General Public License
16 : //# along with this library; if not, write to the Free Software
17 : //# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning AIPS++ should be addressed as follows:
20 : //# Internet email: casa-feedback@nrao.edu.
21 : //# Postal address: AIPS++ Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : //# $Id: $
27 :
28 : // To make Timer reports in the inner loop of the simple copy,
29 : // uncomment the following line:
30 : //#define COPYTIMER
31 :
32 : #include <msvis/MSVis/Partition.h>
33 : #include <msvis/MSVis/SubMS.h>
34 : #include <casacore/ms/MSSel/MSSelection.h>
35 : //#include <ms/MSSel/MSTimeGram.h>
36 : //#include <tables/TaQL/ExprNode.h>
37 : #include <casacore/tables/Tables/RefRows.h>
38 : #include <casacore/ms/MeasurementSets/MSColumns.h>
39 : #include <casacore/ms/MeasurementSets/MSMainColumns.h>
40 : #include <casacore/coordinates/Coordinates/CoordinateUtil.h>
41 : #include <casacore/casa/Arrays/Matrix.h>
42 : #include <casacore/casa/Arrays/Cube.h>
43 : #include <casacore/casa/Arrays/ArrayMath.h>
44 : #include <casacore/casa/Arrays/ArrayLogical.h>
45 : #include <casacore/casa/Arrays/ArrayUtil.h>
46 : #include <casacore/casa/Arrays/IPosition.h>
47 : #include <casacore/casa/Arrays/Slice.h>
48 : #include <casacore/casa/Logging/LogIO.h>
49 : #include <casacore/casa/OS/File.h>
50 : #include <casacore/casa/OS/HostInfo.h>
51 : #include <casacore/casa/OS/Memory.h> // Can be commented out along with
52 : // // Memory:: calls.
53 :
54 : //#ifdef COPYTIMER
55 : #include <casacore/casa/OS/Timer.h>
56 : //#endif
57 :
58 : #include <casacore/casa/Containers/Record.h>
59 : #include <casacore/casa/BasicSL/String.h>
60 : #include <casacore/casa/Utilities/Assert.h>
61 : #include <casacore/casa/Utilities/GenSort.h>
62 : #include <casacore/casa/System/AppInfo.h>
63 : #include <casacore/casa/System/ProgressMeter.h>
64 : #include <casacore/casa/Quanta/QuantumHolder.h>
65 : #include <msvis/MSVis/VisSet.h>
66 : #include <msvis/MSVis/VisBuffer.h>
67 : #include <msvis/MSVis/VisChunkAverager.h>
68 : #include <msvis/MSVis/VisIterator.h>
69 : //#include <msvis/MSVis/VisibilityIterator.h>
70 : #include <casacore/tables/DataMan/IncrementalStMan.h>
71 : #include <casacore/tables/Tables/ScalarColumn.h>
72 : #include <casacore/tables/Tables/ScaColDesc.h>
73 : #include <casacore/tables/Tables/SetupNewTab.h>
74 : #include <casacore/tables/DataMan/StandardStMan.h>
75 : #include <casacore/tables/Tables/Table.h>
76 : #include <casacore/tables/Tables/PlainTable.h>
77 : #include <casacore/tables/Tables/TableDesc.h>
78 : #include <casacore/tables/Tables/TableInfo.h>
79 : #include <casacore/tables/Tables/TableLock.h>
80 : #include <casacore/tables/Tables/TableRecord.h>
81 : #include <casacore/tables/Tables/TableCopy.h>
82 : #include <casacore/tables/Tables/TableRow.h>
83 : #include <casacore/tables/DataMan/TiledColumnStMan.h>
84 : #include <casacore/tables/DataMan/TiledShapeStMan.h>
85 : #include <casacore/tables/DataMan/TiledDataStMan.h>
86 : #include <casacore/tables/DataMan/TiledStManAccessor.h>
87 : #include <casacore/ms/MeasurementSets/MSTileLayout.h>
88 : #include <sstream>
89 : #include <iomanip>
90 : #include <functional>
91 : #include <casacore/measures/Measures/MeasTable.h>
92 : #include <casacore/casa/Quanta/MVTime.h>
93 :
94 : using namespace casacore;
95 : namespace casa {
96 :
97 : //typedef ROVisibilityIterator ROVisIter;
98 : //typedef VisibilityIterator VisIter;
99 :
100 0 : Partition::Partition(String& theMS, Table::TableOption option) :
101 0 : ms_p(MeasurementSet(theMS, option)),
102 0 : mssel_p(ms_p),
103 0 : msc_p(NULL),
104 0 : mscIn_p(NULL),
105 : // sameShape_p(true),
106 0 : antennaSel_p(false),
107 0 : timeBin_p(-1.0),
108 0 : scanString_p(""),
109 0 : intentString_p(""),
110 0 : obsString_p(""),
111 0 : uvrangeString_p(""),
112 0 : taqlString_p(""),
113 0 : timeRange_p(""),
114 0 : arrayExpr_p(""),
115 0 : combine_p(""),
116 0 : maxnchan_p(1),
117 0 : maxncorr_p(1)
118 : {
119 0 : }
120 :
121 0 : Partition::Partition(MeasurementSet& ms) :
122 0 : ms_p(ms),
123 0 : mssel_p(ms_p),
124 0 : msc_p(NULL),
125 0 : mscIn_p(NULL),
126 : //sameShape_p(true),
127 0 : antennaSel_p(false),
128 0 : timeBin_p(-1.0),
129 0 : scanString_p(""),
130 0 : intentString_p(""),
131 0 : obsString_p(""),
132 0 : uvrangeString_p(""),
133 0 : taqlString_p(""),
134 0 : timeRange_p(""),
135 0 : arrayExpr_p(""),
136 0 : combine_p(""),
137 0 : maxnchan_p(1),
138 0 : maxncorr_p(1)
139 : {
140 0 : }
141 :
142 0 : Partition::~Partition()
143 : {
144 0 : if(!msOut_p.isNull())
145 0 : msOut_p.flush();
146 :
147 0 : delete msc_p;
148 0 : msc_p = NULL;
149 :
150 0 : delete mscIn_p;
151 0 : mscIn_p = NULL;
152 :
153 0 : msOut_p = MeasurementSet();
154 :
155 : // parseColumnNames unavoidably has a static String and Vector<MS::PredefinedColumns>.
156 : // Collapse them down to free most of that memory.
157 0 : SubMS::parseColumnNames("None");
158 0 : }
159 :
160 0 : Bool Partition::selectSpw(const String& spwstr)
161 : {
162 0 : LogIO os(LogOrigin("Partition", "selectSpw()"));
163 :
164 : // Partitioning by definition requires the output subtables to be the same as
165 : // the subset ones, with the side-effect that channel selection is not
166 : // allowed. spw selection is OK, though, so frisk spwstr for colons.
167 0 : if(spwstr.contains(":")){
168 : os << LogIO::SEVERE
169 : << "Partitioning does not permit selecting by channel. (Selecting by spw is OK)\n"
170 0 : << LogIO::POST;
171 0 : return false;
172 : }
173 :
174 0 : MSSelection mssel;
175 0 : String myspwstr(spwstr == "" ? "*" : spwstr);
176 :
177 0 : mssel.setSpwExpr(myspwstr);
178 :
179 : // Each row should have spw, start, stop, step
180 0 : Matrix<Int> chansel = mssel.getChanList(&ms_p, 1);
181 :
182 0 : if(chansel.nrow() > 0){ // Use myspwstr if it selected anything...
183 0 : spw_p = chansel.column(0);
184 : }
185 : else{ // select everything
186 0 : MSSpWindowColumns mySpwTab(ms_p.spectralWindow());
187 :
188 0 : spw_p.resize(mySpwTab.nrow());
189 0 : indgen(spw_p);
190 0 : }
191 :
192 : // Check for and filter out selected spws that aren't included in
193 : // DATA_DESCRIPTION. (See CAS-1673 for an example.)
194 0 : std::set<Int> badSelSpwSlots(SubMS::findBadSpws(ms_p, spw_p));
195 0 : uInt nbadSelSpwSlots = badSelSpwSlots.size();
196 0 : if(nbadSelSpwSlots > 0){
197 0 : os << LogIO::WARN << "Selected input spw(s)\n";
198 0 : for(std::set<Int>::iterator bbit = badSelSpwSlots.begin();
199 0 : bbit != badSelSpwSlots.end(); ++bbit)
200 0 : os << spw_p[*bbit] << " ";
201 : os << "\nwere not found in DATA_DESCRIPTION and are being excluded."
202 0 : << LogIO::POST;
203 :
204 0 : uInt nSelSpw = spw_p.nelements();
205 0 : uInt ngoodSelSpwSlots = nSelSpw - nbadSelSpwSlots;
206 0 : Vector<Int> spwc(ngoodSelSpwSlots);
207 0 : std::set<Int>::iterator bsend = badSelSpwSlots.end();
208 :
209 0 : uInt j = 0;
210 0 : for(uInt k = 0; k < nSelSpw; ++k){
211 0 : if(badSelSpwSlots.find(k) == bsend){
212 0 : spwc[j] = spw_p[k];
213 0 : ++j;
214 : }
215 : }
216 0 : spw_p.resize(ngoodSelSpwSlots);
217 0 : spw_p = spwc;
218 0 : }
219 0 : return true;
220 0 : }
221 :
222 0 : Bool Partition::setmsselect(const String& spw, const String& field,
223 : const String& baseline, const String& scan,
224 : const String& uvrange, const String& taql,
225 : const String& subarray, const String& intent,
226 : const String& obs)
227 : {
228 0 : LogIO os(LogOrigin("Partition", "setmsselect()"));
229 : Bool ok;
230 :
231 0 : String myspwstr(spw == "" ? "*" : spw);
232 0 : Record selrec = ms_p.msseltoindex(myspwstr, field);
233 :
234 0 : ok = selectSource(selrec.asArrayInt("field"));
235 :
236 : // All of the requested selection functions will be tried, even if an
237 : // earlier one has indicated its failure. This allows all of the selection
238 : // strings to be tested, yielding more complete feedback for the user
239 : // (fewer retries). This is a matter of taste, though. If the selections
240 : // turn out to be slow, this function should return on the first false.
241 :
242 0 : if(!selectSpw(myspwstr)){
243 0 : os << LogIO::SEVERE << "No channels selected." << LogIO::POST;
244 0 : ok = false;
245 : }
246 :
247 0 : if(baseline != ""){
248 0 : Vector<Int> antid(0);
249 0 : Vector<String> antstr(1,baseline);
250 0 : selectAntenna(antid, antstr);
251 0 : }
252 0 : scanString_p = scan;
253 0 : intentString_p = intent;
254 0 : obsString_p = obs;
255 0 : uvrangeString_p = uvrange;
256 0 : taqlString_p = taql;
257 :
258 0 : if(subarray != "")
259 0 : selectArray(subarray);
260 :
261 0 : return ok;
262 0 : }
263 :
264 0 : Bool Partition::selectSource(const Vector<Int>& fieldid)
265 : {
266 0 : LogIO os(LogOrigin("Partition", "selectSource()"));
267 0 : Bool cando = true;
268 :
269 0 : if(fieldid.nelements() < 1){
270 0 : fieldid_p = Vector<Int>(1, -1);
271 : }
272 0 : else if(fieldid.nelements() > ms_p.field().nrow()){
273 : os << LogIO::SEVERE
274 : << "More fields were requested than are in the input MS.\n"
275 0 : << LogIO::POST;
276 0 : cando = false;
277 : }
278 0 : else if(max(fieldid) >= static_cast<Int>(ms_p.field().nrow())){
279 : // Arriving here is very unlikely since if fieldid came from MSSelection
280 : // bad fields were presumably already quietly dropped.
281 : os << LogIO::SEVERE
282 : << "At least 1 field was requested that is not in the input MS.\n"
283 0 : << LogIO::POST;
284 0 : cando = false;
285 : }
286 : else{
287 0 : fieldid_p = fieldid;
288 : }
289 :
290 0 : if(fieldid_p.nelements() == 1 && fieldid_p[0] < 0){
291 0 : fieldid_p.resize(ms_p.field().nrow());
292 0 : indgen(fieldid_p);
293 : }
294 0 : return cando;
295 0 : }
296 :
297 0 : void Partition::selectArray(const String& subarray)
298 : {
299 0 : arrayExpr_p = subarray;
300 0 : if(arrayExpr_p == "") // Zap any old ones.
301 0 : arrayId_p.resize();
302 : // else arrayId_p will get set in makeSelection().
303 0 : }
304 :
305 0 : void Partition::selectTime(Double timeBin, String timerng)
306 : {
307 0 : timeBin_p = timeBin;
308 0 : timeRange_p = timerng;
309 0 : }
310 :
311 :
312 0 : Bool Partition::makePartition(String& msname, String& colname,
313 : const Vector<Int>& tileShape, const String& combine)
314 : {
315 0 : LogIO os(LogOrigin("Partition", "makePartition()"));
316 : try{
317 0 : if((spw_p.nelements()>0) && (max(spw_p) >= Int(ms_p.spectralWindow().nrow()))){
318 : os << LogIO::SEVERE
319 : << "SpectralWindow selection contains elements that do not exist in "
320 : << "this MS"
321 0 : << LogIO::POST;
322 0 : ms_p=MeasurementSet();
323 0 : return false;
324 : }
325 :
326 : // Watch out! This throws an AipsError if ms_p doesn't have the
327 : // requested columns.
328 : const Vector<MS::PredefinedColumns> colNamesTok = SubMS::parseColumnNames(colname,
329 0 : ms_p);
330 :
331 0 : if(!makeSelection()){
332 : os << LogIO::SEVERE
333 : << "Failed on selection: the combination of spw, field, antenna, correlation, "
334 : << "and timerange may be invalid."
335 0 : << LogIO::POST;
336 0 : ms_p = MeasurementSet();
337 0 : return false;
338 : }
339 0 : mscIn_p = new MSColumns(mssel_p);
340 : // Note again the parseColumnNames() a few lines back that stops setupMS()
341 : // from being called if the MS doesn't have the requested columns.
342 0 : MeasurementSet* outpointer=0;
343 :
344 0 : if(tileShape.nelements() == 3){
345 0 : outpointer = setupMS(msname, mssel_p, maxnchan_p, maxncorr_p,
346 : colNamesTok, tileShape);
347 :
348 : }
349 : // the following calls MSTileLayout... disabled for now because it
350 : // forces tiles to be the full spw bandwidth in width (gmoellen, 2010/11/07)
351 : /*
352 : else if((tileShape.nelements()==1) && (tileShape[0]==0 || tileShape[0]==1)){
353 : outpointer = setupMS(msname, nchan_p[0], ncorr_p[0],
354 : mscIn_p->observation().telescopeName()(0),
355 : colNamesTok, tileShape[0]);
356 : }
357 : */
358 : else{
359 :
360 : // Derive tile shape based on input dataset's tiles, borrowed
361 : // from VisSet's scr col tile shape derivation
362 : // (this may need some tweaking for averaging cases)
363 0 : TableDesc td = mssel_p.actualTableDesc();
364 :
365 : // If a non-DATA column, i.e. CORRECTED_DATA, is being written to DATA,
366 : // datacolname must be set to DATA because the tile management in
367 : // setupMS() will look for "TiledDATA", not "TiledCorrectedData".
368 0 : String datacolname = MS::columnName(MS::DATA);
369 : // But if DATA is not present in the input MS, using it would cause a
370 : // segfault.
371 0 : if(!td.isColumn(datacolname))
372 : // This is could be any other kind of *DATA column, including
373 : // FLOAT_DATA or LAG_DATA, but it is guaranteed to be something.
374 0 : datacolname = MS::columnName(colNamesTok[0]);
375 :
376 0 : const ColumnDesc& cdesc = td[datacolname];
377 :
378 0 : String dataManType = cdesc.dataManagerType();
379 0 : String dataManGroup = cdesc.dataManagerGroup();
380 :
381 0 : Bool tiled = (dataManType.contains("Tiled"));
382 :
383 0 : if (tiled) {
384 0 : ROTiledStManAccessor tsm(mssel_p, dataManGroup);
385 0 : uInt nHyper = tsm.nhypercubes();
386 :
387 : // Test clause
388 : if(1){
389 : os << LogIO::DEBUG1
390 : << datacolname << "'s max cache size: "
391 : << tsm.maximumCacheSize() << " bytes.\n"
392 : << "\tnhypercubes: " << nHyper << ".\n"
393 : << "\ttshp of row 0: " << tsm.tileShape(0)
394 : << "\n\thypercube shape of row 0: " << tsm.hypercubeShape(0)
395 0 : << LogIO::POST;
396 : }
397 :
398 :
399 : // Find smallest tile shape
400 0 : Int highestProduct=-INT_MAX;
401 0 : Int highestId=0;
402 0 : for (uInt id=0; id < nHyper; id++) {
403 0 : IPosition tshp(tsm.getTileShape(id));
404 0 : Int product = tshp.product();
405 :
406 : os << LogIO::DEBUG2
407 : << "\thypercube " << id << ":\n"
408 : << "\t\ttshp: " << tshp << "\n"
409 : << "\t\thypercube shape: " << tsm.getHypercubeShape(id)
410 : << ".\n\t\tcache size: " << tsm.getCacheSize(id)
411 : << " buckets.\n\t\tBucket size: " << tsm.getBucketSize(id)
412 : << " bytes."
413 0 : << LogIO::POST;
414 :
415 0 : if (product > 0 && (product > highestProduct)) {
416 0 : highestProduct = product;
417 0 : highestId = id;
418 : }
419 0 : }
420 0 : Vector<Int> dataTileShape = tsm.getTileShape(highestId).asVector();
421 :
422 0 : outpointer = setupMS(msname, mssel_p, maxnchan_p, maxncorr_p,
423 : colNamesTok, dataTileShape);
424 :
425 0 : }
426 : else
427 : //Sweep all other cases of bad tileshape to a default one.
428 : // (this probably never happens)
429 0 : outpointer = setupMS(msname, mssel_p, maxnchan_p, maxncorr_p,
430 0 : mscIn_p->observation().telescopeName()(0),
431 : colNamesTok, 0);
432 :
433 0 : }
434 :
435 0 : combine_p = combine;
436 :
437 0 : msOut_p= *outpointer;
438 :
439 0 : if(!fillAllTables(colNamesTok)){
440 0 : delete outpointer;
441 0 : os << LogIO::WARN << msname << " left unfinished." << LogIO::POST;
442 0 : ms_p=MeasurementSet();
443 0 : return false;
444 : }
445 :
446 : // msOut_p.relinquishAutoLocks (true);
447 : // msOut_p.unlock();
448 : //Detaching the selected part
449 0 : ms_p=MeasurementSet();
450 :
451 0 : delete outpointer;
452 0 : return true;
453 0 : }
454 0 : catch(AipsError x){
455 0 : ms_p=MeasurementSet();
456 0 : throw(x);
457 0 : }
458 0 : catch(...){
459 0 : ms_p=MeasurementSet();
460 0 : throw(AipsError("Unknown exception caught"));
461 0 : }
462 :
463 0 : }
464 :
465 0 : MeasurementSet* Partition::makeScratchPartition(const String& colname,
466 : const Bool forceInMemory)
467 : {
468 0 : return makeScratchPartition(SubMS::parseColumnNames(colname, ms_p),
469 0 : forceInMemory);
470 : }
471 :
472 0 : MeasurementSet* Partition::makeScratchPartition(const Vector<MS::PredefinedColumns>& whichDataCols,
473 : const Bool forceInMemory)
474 : {
475 0 : LogIO os(LogOrigin("Partition", "makeScratchPartition()"));
476 :
477 0 : if(max(fieldid_p) >= Int(ms_p.field().nrow())){
478 : os << LogIO::SEVERE
479 : << "Field selection contains elements that do not exist in "
480 : << "this MS"
481 0 : << LogIO::POST;
482 0 : ms_p=MeasurementSet();
483 0 : return 0;
484 : }
485 0 : if(max(spw_p) >= Int(ms_p.spectralWindow().nrow())){
486 : os << LogIO::SEVERE
487 : << "SpectralWindow selection contains elements that do not exist in "
488 : << "this MS"
489 0 : << LogIO::POST;
490 0 : ms_p=MeasurementSet();
491 0 : return 0;
492 : }
493 :
494 0 : if(!makeSelection()){
495 : os << LogIO::SEVERE
496 : << "Failed on selection: combination of spw and/or field and/or time "
497 : << "chosen may be invalid."
498 0 : << LogIO::POST;
499 0 : ms_p=MeasurementSet();
500 0 : return 0;
501 : }
502 0 : mscIn_p=new MSColumns(mssel_p);
503 0 : Double sizeInMB= 1.5 * n_bytes() / (1024.0 * 1024.0);
504 0 : String msname=AppInfo::workFileName(uInt(sizeInMB), "TempPartition");
505 :
506 0 : MeasurementSet* outpointer = setupMS(msname, mssel_p, maxnchan_p, maxncorr_p,
507 0 : mscIn_p->observation().telescopeName()(0),
508 : whichDataCols);
509 :
510 0 : outpointer->markForDelete();
511 : //Hmmmmmm....memory......
512 0 : if(sizeInMB < (Double)(HostInfo::memoryTotal())/(2048.0)
513 0 : || forceInMemory){
514 0 : MeasurementSet* a = outpointer;
515 0 : outpointer= new MeasurementSet(a->copyToMemoryTable("TmpMemoryMS"));
516 0 : outpointer->initRefs();
517 0 : delete a;
518 : }
519 :
520 0 : msOut_p = *outpointer;
521 :
522 0 : if(!fillAllTables(whichDataCols)){
523 0 : delete outpointer;
524 0 : outpointer = 0;
525 0 : ms_p = MeasurementSet();
526 0 : return 0;
527 : }
528 : //Detaching the selected part
529 0 : ms_p=MeasurementSet();
530 0 : return outpointer;
531 0 : }
532 :
533 0 : Bool Partition::fillAllTables(const Vector<MS::PredefinedColumns>& datacols)
534 : {
535 0 : LogIO os(LogOrigin("Partition", "fillAllTables()"));
536 0 : Bool success = true;
537 :
538 : // Copy the subtables before doing anything with the main table. Otherwise
539 : // MSColumns won't work.
540 :
541 : // fill or update
542 0 : Timer timer;
543 :
544 : // Force the Measures frames for all the time type columns to have the same
545 : // reference as the TIME column of the main table.
546 : // Disable the empty table check (with false) because some of the subtables
547 : // (like POINTING) might already have been written.
548 : // However, empty tables are still empty after setting up the reference codes
549 : // here.
550 0 : msc_p = new MSMainColumns(msOut_p);
551 0 : msc_p->setEpochRef(MEpoch::castType(mscIn_p->timeMeas().getMeasRef().getType()),
552 : false);
553 :
554 : // UVW is the only other Measures column in the main table.
555 0 : msc_p->uvwMeas().setDescRefCode(Muvw::castType(mscIn_p->uvwMeas().getMeasRef().getType()));
556 :
557 0 : if(timeBin_p <= 0.0)
558 0 : success &= fillMainTable(datacols);
559 : else
560 0 : success &= doTimeAver(datacols);
561 0 : return success;
562 0 : }
563 :
564 0 : Bool Partition::makeSelection()
565 : {
566 0 : LogIO os(LogOrigin("Partition", "makeSelection()"));
567 :
568 : //VisSet/MSIter will check if the SORTED exists
569 : //and resort if necessary
570 : {
571 : // Matrix<Int> noselection;
572 : // VisSet vs(ms_p, noselection);
573 0 : Block<Int> sort;
574 0 : ROVisibilityIterator(ms_p, sort);
575 0 : }
576 :
577 : const MeasurementSet *elms;
578 0 : elms = &ms_p;
579 0 : MeasurementSet sorted;
580 0 : if(ms_p.keywordSet().isDefined("SORTED_TABLE")){
581 0 : sorted = ms_p.keywordSet().asTable("SORTED_TABLE");
582 : //If ms is not writable and sort is a subselection...use original ms
583 0 : if(ms_p.nrow() == sorted.nrow())
584 0 : elms = &sorted;
585 : }
586 :
587 0 : MSSelection thisSelection;
588 0 : if(fieldid_p.nelements() > 0)
589 0 : thisSelection.setFieldExpr(MSSelection::indexExprStr(fieldid_p));
590 0 : if(spw_p.nelements() > 0)
591 0 : thisSelection.setSpwExpr(MSSelection::indexExprStr(spw_p));
592 0 : if(antennaSel_p){
593 0 : if(antennaId_p.nelements() > 0){
594 0 : thisSelection.setAntennaExpr(MSSelection::indexExprStr( antennaId_p ));
595 : }
596 0 : if(antennaSelStr_p[0] != "")
597 0 : thisSelection.setAntennaExpr(MSSelection::nameExprStr( antennaSelStr_p));
598 : }
599 0 : if(timeRange_p != "")
600 0 : thisSelection.setTimeExpr(timeRange_p);
601 :
602 0 : thisSelection.setUvDistExpr(uvrangeString_p);
603 0 : thisSelection.setScanExpr(scanString_p);
604 0 : thisSelection.setStateExpr(intentString_p);
605 0 : thisSelection.setObservationExpr(obsString_p);
606 0 : if(arrayExpr_p != "")
607 0 : thisSelection.setArrayExpr(arrayExpr_p);
608 0 : if(corrString_p != "")
609 0 : thisSelection.setPolnExpr(corrString_p);
610 0 : thisSelection.setTaQLExpr(taqlString_p);
611 :
612 0 : TableExprNode exprNode = thisSelection.toTableExprNode(elms);
613 0 : selTimeRanges_p = thisSelection.getTimeList();
614 :
615 : // Find the maximum # of channels and correlations, for setting the
616 : // tileshape. VisIterator is horrible if the tileshape is too large.
617 : // Partition does not use VisIterator, so the max # of chans and corrs may
618 : // not be optimal. However, the # of chans and corrs for DDID 0 may be very
619 : // misrepresentative of the MS in general.
620 : {
621 0 : ScalarColumn<Int> polId(elms->dataDescription(),
622 0 : MSDataDescription::columnName(MSDataDescription::POLARIZATION_ID));
623 0 : ScalarColumn<Int> spwId(elms->dataDescription(),
624 0 : MSDataDescription::columnName(MSDataDescription::SPECTRAL_WINDOW_ID));
625 0 : ScalarColumn<Int> ncorr(elms->polarization(),
626 0 : MSPolarization::columnName(MSPolarization::NUM_CORR));
627 0 : ScalarColumn<Int> nchan(elms->spectralWindow(),
628 0 : MSSpectralWindow::columnName(MSSpectralWindow::NUM_CHAN));
629 :
630 0 : uInt nddids = polId.nrow();
631 0 : uInt nSelSpws = spw_p.nelements();
632 :
633 0 : maxnchan_p = 1;
634 0 : maxncorr_p = 1;
635 0 : for(uInt ddid = 0; ddid < nddids; ++ddid){
636 0 : Int spw = spwId(ddid);
637 0 : for(uInt k = 0; k < nSelSpws; ++k){
638 0 : if(spw == spw_p[k]){ // If spw is selected...
639 0 : if(nchan(spw) > maxnchan_p)
640 0 : maxnchan_p = nchan(spw);
641 0 : if(ncorr(polId(ddid)) > maxncorr_p)
642 0 : maxncorr_p = ncorr(polId(ddid));
643 : }
644 : }
645 : }
646 0 : }
647 :
648 : // Now remake the selected ms
649 0 : if(!(exprNode.isNull())){
650 0 : mssel_p = MeasurementSet((*elms)(exprNode));
651 : }
652 : else{
653 : // Null take all the ms ...setdata() blank means that
654 0 : mssel_p = MeasurementSet((*elms));
655 : }
656 : //mssel_p.rename(ms_p.tableName()+"/SELECTED_TABLE", Table::Scratch);
657 0 : if(mssel_p.nrow() == 0)
658 0 : return false;
659 :
660 0 : if(mssel_p.nrow() < ms_p.nrow()){
661 : os << LogIO::NORMAL
662 : << mssel_p.nrow() << " out of " << ms_p.nrow() << " rows are going to be"
663 : << " considered due to the selection criteria."
664 0 : << LogIO::POST;
665 : }
666 0 : return true;
667 0 : }
668 :
669 0 : MeasurementSet* Partition::setupMS(const String& outFileName, const MeasurementSet& inms,
670 : const Int nchan, const Int nCorr, const String& telescop,
671 : const Vector<MS::PredefinedColumns>& colNames,
672 : const Int obstype)
673 : {
674 : //Choose an appropriate tileshape
675 0 : IPosition dataShape(2, nCorr, nchan);
676 0 : IPosition tileShape = MSTileLayout::tileShape(dataShape, obstype, telescop);
677 0 : return setupMS(outFileName, inms, nchan, nCorr, colNames, tileShape.asVector());
678 0 : }
679 0 : MeasurementSet* Partition::setupMS(const String& outFileName, const MeasurementSet& inms,
680 : const Int nchan, const Int nCorr,
681 : const Vector<MS::PredefinedColumns>& colNamesTok,
682 : const Vector<Int>& tshape)
683 : {
684 :
685 0 : if(tshape.nelements() != 3)
686 0 : throw(AipsError("TileShape has to have 3 elements ") );
687 :
688 : // This is more to shush a compiler warning than to warn users.
689 0 : LogIO os(LogOrigin("Partition", "setupMS()"));
690 0 : if(tshape[0] != nCorr)
691 : os << LogIO::DEBUG1
692 0 : << "Warning: using " << tshape[0] << " from the tileshape instead of "
693 : << nCorr << " for the number of correlations."
694 0 : << LogIO::POST;
695 0 : if(tshape[1] != nchan)
696 : os << LogIO::DEBUG1
697 0 : << "Warning: using " << tshape[1] << " from the tileshape instead of "
698 : << nchan << " for the number of channels."
699 0 : << LogIO::POST;
700 :
701 : // Choose an appropriate tileshape
702 : //IPosition dataShape(2,nCorr,nchan);
703 : //IPosition tileShape = MSTileLayout::tileShape(dataShape,obsType, telescop);
704 : //////////////////
705 :
706 0 : IPosition tileShape(tshape);
707 :
708 : // Make the MS table
709 0 : TableDesc td = MS::requiredTableDesc();
710 :
711 0 : Vector<String> tiledDataNames;
712 :
713 : // Even though we know the data is going to be the same shape throughout I'll
714 : // still create a column that has a variable shape as this will permit MS's
715 : // with other shapes to be appended.
716 0 : uInt ncols = colNamesTok.nelements();
717 0 : const Bool mustWriteOnlyToData = SubMS::mustConvertToData(ncols, colNamesTok);
718 :
719 0 : if (mustWriteOnlyToData)
720 : {
721 :
722 0 : MS::addColumnToDesc(td, MS::DATA, 2);
723 0 : String hcolName=String("Tiled")+String("DATA");
724 0 : td.defineHypercolumn(hcolName, 3,
725 0 : stringToVector("DATA"));
726 0 : tiledDataNames.resize(1);
727 0 : tiledDataNames[0] = hcolName;
728 :
729 0 : }
730 : else{
731 :
732 0 : tiledDataNames.resize(ncols);
733 0 : for(uInt i = 0; i < ncols; ++i){
734 : // Unfortunately MS::PredefinedColumns aren't ordered so that I can just check if
735 : // colNamesTok[i] is in the "data range".
736 0 : if(colNamesTok[i] == MS::DATA ||
737 0 : colNamesTok[i] == MS::MODEL_DATA ||
738 0 : colNamesTok[i] == MS::CORRECTED_DATA ||
739 0 : colNamesTok[i] == MS::FLOAT_DATA ||
740 0 : colNamesTok[i] == MS::LAG_DATA)
741 0 : MS::addColumnToDesc(td, colNamesTok[i], 2);
742 : else
743 0 : throw(AipsError(MS::columnName(colNamesTok[i]) +
744 0 : " is not a recognized data column "));
745 :
746 0 : String hcolName = String("Tiled") + MS::columnName(colNamesTok[i]);
747 0 : td.defineHypercolumn(hcolName, 3,
748 0 : stringToVector(MS::columnName(colNamesTok[i])));
749 0 : tiledDataNames[i] = hcolName;
750 0 : }
751 :
752 : }
753 :
754 : // add this optional column because random group fits has a
755 : // weight per visibility
756 0 : MS::addColumnToDesc(td, MS::WEIGHT_SPECTRUM, 2);
757 :
758 : // td.defineHypercolumn("TiledDATA",3,
759 : // stringToVector(MS::columnName(MS::DATA)));
760 0 : td.defineHypercolumn("TiledFlag",3,
761 0 : stringToVector(MS::columnName(MS::FLAG)));
762 0 : td.defineHypercolumn("TiledFlagCategory",4,
763 0 : stringToVector(MS::columnName(MS::FLAG_CATEGORY)));
764 0 : td.defineHypercolumn("TiledWgtSpectrum",3,
765 0 : stringToVector(MS::columnName(MS::WEIGHT_SPECTRUM)));
766 0 : td.defineHypercolumn("TiledUVW",2,
767 0 : stringToVector(MS::columnName(MS::UVW)));
768 0 : td.defineHypercolumn("TiledWgt",2,
769 0 : stringToVector(MS::columnName(MS::WEIGHT)));
770 0 : td.defineHypercolumn("TiledSigma", 2,
771 0 : stringToVector(MS::columnName(MS::SIGMA)));
772 :
773 :
774 0 : SetupNewTable newtab(outFileName, td, Table::New);
775 :
776 0 : uInt cache_val=32768;
777 : // Set the default Storage Manager to be the Incr one
778 0 : IncrementalStMan incrStMan ("ISMData",cache_val);
779 0 : newtab.bindAll(incrStMan, true);
780 : //Override the binding for specific columns
781 :
782 0 : IncrementalStMan incrStMan0("Array_ID",cache_val);
783 0 : newtab.bindColumn(MS::columnName(MS::ARRAY_ID), incrStMan0);
784 0 : IncrementalStMan incrStMan1("EXPOSURE",cache_val);
785 0 : newtab.bindColumn(MS::columnName(MS::EXPOSURE), incrStMan1);
786 0 : IncrementalStMan incrStMan2("FEED1",cache_val);
787 0 : newtab.bindColumn(MS::columnName(MS::FEED1), incrStMan2);
788 0 : IncrementalStMan incrStMan3("FEED2",cache_val);
789 0 : newtab.bindColumn(MS::columnName(MS::FEED2), incrStMan3);
790 0 : IncrementalStMan incrStMan4("FIELD_ID",cache_val);
791 0 : newtab.bindColumn(MS::columnName(MS::FIELD_ID), incrStMan4);
792 : // jagonzal (CAS-6746): Don't use IncrementalStMan with RW cols ///////////////////////////////////////////////////
793 : //IncrementalStMan incrStMan5("FLAG_ROW",cache_val/4);
794 : //newtab.bindColumn(MS::columnName(MS::FLAG_ROW), incrStMan5);
795 0 : StandardStMan aipsStManFlagRow("FLAG_ROW", cache_val/4);
796 0 : newtab.bindColumn(MS::columnName(MS::FLAG_ROW), aipsStManFlagRow);
797 : ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
798 0 : IncrementalStMan incrStMan6("INTERVAL",cache_val);
799 0 : newtab.bindColumn(MS::columnName(MS::INTERVAL), incrStMan6);
800 0 : IncrementalStMan incrStMan7("OBSERVATION_ID",cache_val);
801 0 : newtab.bindColumn(MS::columnName(MS::OBSERVATION_ID), incrStMan7);
802 0 : IncrementalStMan incrStMan8("PROCESSOR_ID",cache_val);
803 0 : newtab.bindColumn(MS::columnName(MS::PROCESSOR_ID), incrStMan8);
804 0 : IncrementalStMan incrStMan9("SCAN_NUMBER",cache_val);
805 0 : newtab.bindColumn(MS::columnName(MS::SCAN_NUMBER), incrStMan9);
806 0 : IncrementalStMan incrStMan10("STATE_ID",cache_val);
807 0 : newtab.bindColumn(MS::columnName(MS::STATE_ID), incrStMan10);
808 0 : IncrementalStMan incrStMan11("TIME",cache_val);
809 0 : newtab.bindColumn(MS::columnName(MS::TIME), incrStMan11);
810 0 : IncrementalStMan incrStMan12("TIME_CENTROID",cache_val);
811 0 : newtab.bindColumn(MS::columnName(MS::TIME_CENTROID), incrStMan12);
812 :
813 : // Bind ANTENNA1, ANTENNA2 and DATA_DESC_ID to the standardStMan
814 : // as they may change sufficiently frequently to make the
815 : // incremental storage manager inefficient for these columns.
816 :
817 :
818 0 : StandardStMan aipsStMan0("ANTENNA1", cache_val);
819 0 : newtab.bindColumn(MS::columnName(MS::ANTENNA1), aipsStMan0);
820 0 : StandardStMan aipsStMan1("ANTENNA2", cache_val);
821 0 : newtab.bindColumn(MS::columnName(MS::ANTENNA2), aipsStMan1);
822 0 : StandardStMan aipsStMan2("DATA_DESC_ID", cache_val);
823 0 : newtab.bindColumn(MS::columnName(MS::DATA_DESC_ID), aipsStMan2);
824 :
825 :
826 : // itsLog << LogOrigin("MSFitsInput", "setupMeasurementSet");
827 : //itsLog << LogIO::NORMAL << "Using tile shape "<<tileShape <<" for "<<
828 : // array_p<<" with obstype="<< obsType<<LogIO::POST;
829 :
830 : // TiledShapeStMan tiledStMan1("TiledData",tileShape);
831 0 : TiledShapeStMan tiledStMan1f("TiledFlag",tileShape);
832 : TiledShapeStMan tiledStMan1fc("TiledFlagCategory",
833 0 : IPosition(4,tileShape(0),tileShape(1),1,
834 0 : tileShape(2)));
835 0 : TiledShapeStMan tiledStMan2("TiledWgtSpectrum",tileShape);
836 0 : TiledColumnStMan tiledStMan3("TiledUVW",IPosition(2, 3, (tileShape(0) * tileShape(1) * tileShape(2)) / 3));
837 : TiledShapeStMan tiledStMan4("TiledWgt",
838 0 : IPosition(2,tileShape(0), tileShape(1) * tileShape(2)));
839 : TiledShapeStMan tiledStMan5("TiledSigma",
840 0 : IPosition(2,tileShape(0), tileShape(1) * tileShape(2)));
841 :
842 : // Bind the DATA, FLAG & WEIGHT_SPECTRUM columns to the tiled stman
843 :
844 0 : if (mustWriteOnlyToData){
845 0 : TiledShapeStMan tiledStMan1Data("TiledDATA",tileShape);
846 :
847 0 : newtab.bindColumn(MS::columnName(MS::DATA), tiledStMan1Data);
848 0 : }
849 : else{
850 0 : for(uInt i = 0; i < ncols; ++i){
851 0 : TiledShapeStMan tiledStMan1Data(tiledDataNames[i], tileShape);
852 :
853 0 : newtab.bindColumn(MS::columnName(colNamesTok[i]), tiledStMan1Data);
854 0 : }
855 :
856 : }
857 0 : newtab.bindColumn(MS::columnName(MS::FLAG),tiledStMan1f);
858 0 : newtab.bindColumn(MS::columnName(MS::FLAG_CATEGORY),tiledStMan1fc);
859 0 : newtab.bindColumn(MS::columnName(MS::WEIGHT_SPECTRUM),tiledStMan2);
860 :
861 0 : newtab.bindColumn(MS::columnName(MS::UVW),tiledStMan3);
862 0 : newtab.bindColumn(MS::columnName(MS::WEIGHT),tiledStMan4);
863 0 : newtab.bindColumn(MS::columnName(MS::SIGMA),tiledStMan5);
864 :
865 : // Copy the subtables to the output table BEFORE converting it to an MS or
866 : // adding (additional) locking.
867 0 : Table realtab(newtab);
868 0 : TableCopy::copySubTables(realtab, inms);
869 0 : realtab.flush();
870 :
871 : // avoid lock overheads by locking the table permanently
872 0 : TableLock lock(TableLock::AutoLocking);
873 0 : MeasurementSet *ms = new MeasurementSet(realtab.tableName(), lock, Table::Update);
874 :
875 : { // Set the TableInfo
876 :
877 0 : TableInfo& info(ms->tableInfo());
878 0 : info.setType(TableInfo::type(TableInfo::MEASUREMENTSET));
879 0 : info.setSubType(String("UVFITS"));
880 : info.readmeAddLine
881 0 : ("This is a measurement set Table holding astronomical observations");
882 :
883 : }
884 :
885 0 : return ms;
886 0 : }
887 :
888 : // This method is currently not called in Partition. It should really be called
889 : // in setupMS, but that has been made into a static method and it cannot be
890 : // called there. The ms argument is unused, but it is there to preserve the
891 : // signature, even though it causes a compiler warning.
892 : //
893 0 : void Partition::verifyColumns(const MeasurementSet&, // ms,
894 : const Vector<MS::PredefinedColumns>& colNames)
895 : {
896 0 : LogIO os(LogOrigin("Partition", "verifyColumns()"));
897 0 : for(uInt i = 0; i < colNames.nelements(); ++i){
898 0 : if(!ms_p.tableDesc().isColumn(MS::columnName(colNames[i]))){
899 0 : ostringstream ostr;
900 0 : ostr << "Desired column (" << MS::columnName(colNames[i])
901 0 : << ") not found in the input MS (" << ms_p.tableName() << ").";
902 0 : throw(AipsError(ostr.str()));
903 0 : }
904 : }
905 0 : }
906 :
907 0 : Bool Partition::fillAccessoryMainCols(){
908 0 : LogIO os(LogOrigin("Partition", "fillAccessoryMainCols()"));
909 0 : uInt nrows = mssel_p.nrow();
910 :
911 0 : msOut_p.addRow(nrows, true);
912 :
913 : //#ifdef COPYTIMER
914 0 : Timer timer;
915 0 : timer.mark();
916 : //#endif
917 0 : msc_p->antenna1().putColumn(mscIn_p->antenna1().getColumn());
918 0 : msc_p->antenna2().putColumn(mscIn_p->antenna2().getColumn());
919 : os << LogIO::DEBUG1
920 : << "Copying ANTENNA* took " << timer.real() << "s."
921 0 : << LogIO::POST;
922 :
923 0 : timer.mark();
924 0 : msc_p->feed1().putColumn(mscIn_p->feed1().getColumn());
925 0 : msc_p->feed2().putColumn(mscIn_p->feed2().getColumn());
926 : os << LogIO::DEBUG1
927 : << "Copying FEED* took " << timer.real() << "s."
928 0 : << LogIO::POST;
929 :
930 0 : timer.mark();
931 0 : msc_p->exposure().putColumn(mscIn_p->exposure().getColumn());
932 : os << LogIO::DEBUG1
933 : << "Copying EXPOSURE took " << timer.real() << "s."
934 0 : << LogIO::POST;
935 :
936 0 : timer.mark();
937 0 : msc_p->flagRow().putColumn(mscIn_p->flagRow().getColumn());
938 : os << LogIO::DEBUG1
939 : << "Copying flagRow took " << timer.real() << "s."
940 0 : << LogIO::POST;
941 :
942 0 : timer.mark();
943 0 : msc_p->interval().putColumn(mscIn_p->interval().getColumn());
944 : os << LogIO::DEBUG1
945 : << "Copying INTERVAL took " << timer.real() << "s."
946 0 : << LogIO::POST;
947 :
948 0 : timer.mark();
949 0 : msc_p->scanNumber().putColumn(mscIn_p->scanNumber().getColumn());
950 : os << LogIO::DEBUG1
951 : << "Copying scanNumber took " << timer.real() << "s."
952 0 : << LogIO::POST;
953 :
954 0 : timer.mark();
955 0 : msc_p->time().putColumn(mscIn_p->time().getColumn());
956 : os << LogIO::DEBUG1
957 : << "Copying TIME took " << timer.real() << "s."
958 0 : << LogIO::POST;
959 :
960 0 : timer.mark();
961 0 : msc_p->timeCentroid().putColumn(mscIn_p->timeCentroid().getColumn());
962 : os << LogIO::DEBUG1
963 : << "Copying timeCentroid took " << timer.real() << "s."
964 0 : << LogIO::POST;
965 :
966 0 : timer.mark();
967 0 : msc_p->dataDescId().putColumn(mscIn_p->dataDescId().getColumn());
968 0 : msc_p->fieldId().putColumn(mscIn_p->fieldId().getColumn());
969 0 : msc_p->arrayId().putColumn(mscIn_p->arrayId().getColumn());
970 0 : msc_p->stateId().putColumn(mscIn_p->stateId().getColumn());
971 0 : msc_p->processorId().putColumn(mscIn_p->processorId().getColumn());
972 0 : msc_p->observationId().putColumn(mscIn_p->observationId().getColumn());
973 : os << LogIO::DEBUG1
974 : << "Copying DDID, FIELD, ARRAY_ID, STATE, PROC, and OBS took "
975 : << timer.real() << "s."
976 0 : << LogIO::POST;
977 :
978 : // ScalarMeasColumn doesn't have a putColumn() for some reason.
979 : //msc_p->uvwMeas().putColumn(mscIn_p->uvwMeas());
980 0 : timer.mark();
981 : //msc_p->uvw().putColumn(mscIn_p->uvw()); // 98s for 4.7e6 rows
982 :
983 : // 3.06s for 4.7e6 rows
984 : //RefRows refrows(0, nrows - 1);
985 : //msc_p->uvw().putColumnCells(refrows, mscIn_p->uvw().getColumn());
986 :
987 0 : msc_p->uvw().putColumn(mscIn_p->uvw().getColumn()); // 2.74s for 4.7e6 rows
988 : os << LogIO::DEBUG1
989 : << "Copying uvw took " << timer.real() << "s."
990 0 : << LogIO::POST;
991 :
992 0 : timer.mark();
993 0 : return true;
994 0 : }
995 :
996 0 : Bool Partition::fillMainTable(const Vector<MS::PredefinedColumns>& colNames)
997 : {
998 0 : LogIO os(LogOrigin("Partition", "fillMainTable()"));
999 0 : Bool success = true;
1000 0 : Timer timer;
1001 :
1002 0 : fillAccessoryMainCols();
1003 :
1004 : //Deal with data
1005 0 : ArrayColumn<Complex> data;
1006 0 : Vector<MS::PredefinedColumns> complexCols;
1007 0 : const Bool doFloat = SubMS::sepFloat(colNames, complexCols);
1008 0 : const uInt nDataCols = complexCols.nelements();
1009 0 : const Bool writeToDataCol = SubMS::mustConvertToData(nDataCols, complexCols);
1010 :
1011 : // timer.mark();
1012 : // msc_p->weight().putColumn(mscIn_p->weight());
1013 : // os << LogIO::DEBUG1
1014 : // << "Copying weight took " << timer.real() << "s."
1015 : // << LogIO::POST;
1016 : // timer.mark();
1017 : // msc_p->sigma().putColumn(mscIn_p->sigma());
1018 : // os << LogIO::DEBUG1
1019 : // << "Copying SIGMA took " << timer.real() << "s."
1020 : // << LogIO::POST;
1021 :
1022 : // timer.mark();
1023 : // msc_p->flag().putColumn(mscIn_p->flag());
1024 : // os << LogIO::DEBUG1
1025 : // << "Copying FLAG took " << timer.real() << "s."
1026 : // << LogIO::POST;
1027 :
1028 0 : os << LogIO::DEBUG1;
1029 0 : Bool didFC = false;
1030 0 : timer.mark();
1031 0 : if(!mscIn_p->flagCategory().isNull() &&
1032 0 : mscIn_p->flagCategory().isDefined(0)){
1033 0 : IPosition fcshape(mscIn_p->flagCategory().shape(0));
1034 0 : IPosition fshape(mscIn_p->flag().shape(0));
1035 :
1036 : // I don't know or care how many flag categories there are.
1037 0 : if(fcshape(0) == fshape(0) && fcshape(1) == fshape(1)){
1038 0 : msc_p->flagCategory().putColumn(mscIn_p->flagCategory());
1039 0 : os << "Copying FLAG_CATEGORY took " << timer.real() << "s.";
1040 0 : didFC = true;
1041 : }
1042 0 : }
1043 0 : if(!didFC)
1044 0 : os << "Deciding not to copy FLAG_CATEGORY took " << timer.real() << "s.";
1045 0 : os << LogIO::POST;
1046 :
1047 0 : timer.mark();
1048 0 : copyDataFlagsWtSp(complexCols, writeToDataCol);
1049 0 : if(doFloat)
1050 0 : msc_p->floatData().putColumn(mscIn_p->floatData());
1051 :
1052 : os << LogIO::DEBUG1
1053 : << "Total data read/write time = " << timer.real()
1054 0 : << LogIO::POST;
1055 :
1056 0 : return success;
1057 0 : }
1058 :
1059 0 : Bool Partition::getDataColumn(ArrayColumn<Complex>& data,
1060 : const MS::PredefinedColumns colName)
1061 : {
1062 0 : if(colName == MS::DATA)
1063 0 : data.reference(mscIn_p->data());
1064 0 : else if(colName == MS::MODEL_DATA)
1065 0 : data.reference(mscIn_p->modelData());
1066 0 : else if(colName == MS::LAG_DATA)
1067 0 : data.reference(mscIn_p->lagData());
1068 : else // The honored-by-time-if-nothing-else
1069 0 : data.reference(mscIn_p->correctedData()); // default.
1070 0 : return true;
1071 : }
1072 :
1073 0 : Bool Partition::getDataColumn(ArrayColumn<Float>& data,
1074 : const MS::PredefinedColumns colName)
1075 : {
1076 0 : LogIO os(LogOrigin("Partition", "getDataColumn()"));
1077 :
1078 0 : if(colName != MS::FLOAT_DATA)
1079 : os << LogIO::WARN
1080 : << "Using FLOAT_DATA (because it has type Float) instead of the requested "
1081 : << colName
1082 0 : << LogIO::POST;
1083 :
1084 0 : data.reference(mscIn_p->floatData());
1085 0 : return true;
1086 0 : }
1087 :
1088 0 : Bool Partition::putDataColumn(MSColumns& msc, ArrayColumn<Complex>& data,
1089 : const MS::PredefinedColumns colName,
1090 : const Bool writeToDataCol)
1091 : {
1092 0 : if(writeToDataCol || colName == MS::DATA)
1093 0 : msc.data().putColumn(data);
1094 0 : else if (colName == MS::MODEL_DATA)
1095 0 : msc.modelData().putColumn(data);
1096 0 : else if (colName == MS::CORRECTED_DATA)
1097 0 : msc.correctedData().putColumn(data);
1098 : //else if(colName == MS::FLOAT_DATA) // promotion from Float
1099 : // msc.floatData().putColumn(data); // to Complex is pvt?
1100 0 : else if(colName == MS::LAG_DATA)
1101 0 : msc.lagData().putColumn(data);
1102 : else
1103 0 : return false;
1104 0 : return true;
1105 : }
1106 :
1107 0 : Bool Partition::copyDataFlagsWtSp(const Vector<MS::PredefinedColumns>& colNames,
1108 : const Bool writeToDataCol)
1109 : {
1110 0 : Block<Int> columns;
1111 : // include scan and state iteration, for more optimal iteration
1112 0 : columns.resize(6);
1113 0 : columns[0]=MS::ARRAY_ID;
1114 0 : columns[1]=MS::SCAN_NUMBER;
1115 0 : columns[2]=MS::STATE_ID;
1116 0 : columns[3]=MS::FIELD_ID;
1117 0 : columns[4]=MS::DATA_DESC_ID;
1118 0 : columns[5]=MS::TIME;
1119 :
1120 : #ifdef COPYTIMER
1121 : Timer timer;
1122 : timer.mark();
1123 :
1124 : Vector<Int> inscan, outscan;
1125 : #endif
1126 :
1127 0 : ROVisIter viIn(mssel_p,columns,0.0);
1128 0 : VisIter viOut(msOut_p,columns,0.0);
1129 0 : viIn.setRowBlocking(1000);
1130 0 : viOut.setRowBlocking(1000);
1131 0 : Int iChunk(0), iChunklet(0);
1132 0 : Cube<Complex> data;
1133 0 : Cube<Bool> flag;
1134 :
1135 0 : Matrix<Float> wtmat;
1136 0 : viIn.originChunks(); // Makes me feel better.
1137 0 : const Bool doWtSp(viIn.existsWeightSpectrum());
1138 0 : Cube<Float> wtsp;
1139 :
1140 0 : uInt ninrows = mssel_p.nrow();
1141 : ProgressMeter meter(0.0, ninrows * 1.0, "partition", "rows copied", "", "",
1142 0 : true, 1);
1143 0 : uInt inrowsdone = 0; // only for the meter.
1144 :
1145 0 : for (iChunk=0,viOut.originChunks(),viIn.originChunks();
1146 0 : viOut.moreChunks(),viIn.moreChunks();
1147 0 : viOut.nextChunk(),viIn.nextChunk(),++iChunk) {
1148 0 : inrowsdone += viIn.nRowChunk();
1149 :
1150 : // The following can help evaluable in/out index alignment
1151 : /*
1152 : cout << "****iChunk=" << iChunk
1153 : << " scn: " << viIn.scan(inscan)(0) << "/" << viOut.scan(outscan)(0) << " "
1154 : << "fld: " << viIn.fieldId() << "/" << viOut.fieldId() << " "
1155 : << "ddi: " << viIn.dataDescriptionId() << "/" << viOut.dataDescriptionId() << " "
1156 : << "spw: " << viIn.spectralWindow() << "/" << viOut.spectralWindow() << " "
1157 : << endl;
1158 : */
1159 0 : for (iChunklet=0,viIn.origin(),viOut.origin();
1160 0 : viIn.more(),viOut.more();
1161 0 : viIn++,viOut++,++iChunklet) { //
1162 :
1163 : // cout << "nRows = " << viIn.nRow() << "/" << viOut.nRow() << endl;
1164 :
1165 : #ifdef COPYTIMER
1166 : timer.mark();
1167 : #endif
1168 0 : viIn.flag(flag);
1169 0 : viOut.setFlag(flag);
1170 0 : viIn.weightMat(wtmat);
1171 0 : viOut.setWeightMat(wtmat);
1172 0 : viIn.sigmaMat(wtmat); // Yes, I'm reusing wtmat.
1173 0 : viOut.setSigmaMat(wtmat);
1174 0 : if(doWtSp){
1175 0 : viIn.weightSpectrum(wtsp);
1176 0 : viOut.setWeightSpectrum(wtsp);
1177 : }
1178 :
1179 0 : for(Int colnum = colNames.nelements(); colnum--;){
1180 0 : if(writeToDataCol || colNames[colnum] == MS::DATA) {
1181 : // write DATA, MODEL_DATA, or CORRECTED_DATA to DATA
1182 0 : switch (colNames[colnum]) {
1183 0 : case MS::DATA:
1184 0 : viIn.visibility(data,VisibilityIterator::Observed);
1185 0 : break;
1186 0 : case MS::MODEL_DATA:
1187 0 : viIn.visibility(data,VisibilityIterator::Model);
1188 0 : break;
1189 0 : case MS::CORRECTED_DATA:
1190 0 : viIn.visibility(data,VisibilityIterator::Corrected);
1191 0 : break;
1192 0 : default:
1193 0 : throw(AipsError("Unrecognized input column!"));
1194 : break;
1195 : }
1196 0 : viOut.setVis(data,VisibilityIterator::Observed);
1197 : }
1198 0 : else if (colNames[colnum] == MS::MODEL_DATA) {
1199 : // write MODEL_DATA to MODEL_DATA
1200 0 : viIn.visibility(data,VisibilityIterator::Model);
1201 0 : viOut.setVis(data,VisibilityIterator::Model);
1202 : }
1203 0 : else if (colNames[colnum] == MS::CORRECTED_DATA) {
1204 : // write CORRECTED_DATA to CORRECTED_DATA
1205 0 : viIn.visibility(data,VisibilityIterator::Corrected);
1206 0 : viOut.setVis(data,VisibilityIterator::Corrected);
1207 : }
1208 : //else if(colNames[colnum] == MS::FLOAT_DATA) // TBD
1209 : // else if(colNames[colnum] == MS::LAG_DATA) // TBD
1210 : else
1211 0 : return false;
1212 : }
1213 :
1214 : #ifdef COPYTIMER
1215 : Double t=timer.real();
1216 : cout << "Chunk: " << iChunk << " " << iChunklet
1217 : << " scn: " << viIn.scan(inscan)(0) << "/" << viOut.scan(outscan)(0)
1218 : << " "
1219 : << " spw: " << viIn.spectralWindow() << "/" << viOut.spectralWindow()
1220 : << " : "
1221 : << data.nelements() << " cells = "
1222 : << data.nelements()*8.e-6 << " MB in "
1223 : << t << " sec, for " << data.nelements()*8.e-6/t << " MB/s"
1224 : << endl;
1225 : #endif
1226 :
1227 : }
1228 0 : meter.update(inrowsdone);
1229 : }
1230 0 : msOut_p.flush();
1231 0 : return true;
1232 0 : }
1233 :
1234 0 : Bool Partition::putDataColumn(MSColumns& msc, ArrayColumn<Float>& data,
1235 : const MS::PredefinedColumns colName,
1236 : const Bool writeToDataCol)
1237 : {
1238 0 : LogIO os(LogOrigin("Partition", "putDataColumn()"));
1239 :
1240 0 : if(writeToDataCol)
1241 : os << LogIO::NORMAL
1242 : << "Writing to FLOAT_DATA instead of DATA."
1243 0 : << LogIO::POST;
1244 :
1245 0 : if(colName == MS::FLOAT_DATA){
1246 0 : msc.floatData().putColumn(data);
1247 : }
1248 : else{
1249 : os << LogIO::SEVERE
1250 : << "Float data cannot be written to "
1251 : << MS::columnName(colName)
1252 0 : << LogIO::POST;
1253 0 : return false;
1254 : }
1255 0 : return true;
1256 0 : }
1257 :
1258 0 : Bool Partition::doTimeAver(const Vector<MS::PredefinedColumns>& dataColNames)
1259 : {
1260 0 : LogIO os(LogOrigin("Partition", "doTimeAver()"));
1261 :
1262 : os << LogIO::DEBUG1 // helpdesk ticket from Oleg Smirnov (ODU-232630)
1263 : << "Before msOut_p.addRow(): "
1264 0 : << Memory::allocatedMemoryInBytes() / (1024.0 * 1024.0) << " MB"
1265 0 : << LogIO::POST;
1266 :
1267 0 : Vector<MS::PredefinedColumns> cmplxColLabels;
1268 0 : const Bool doFloat = SubMS::sepFloat(dataColNames, cmplxColLabels);
1269 0 : const uInt nCmplx = cmplxColLabels.nelements();
1270 0 : if(doFloat && cmplxColLabels.nelements() > 0) // 2010-12-14
1271 : os << LogIO::WARN
1272 : << "Using VisibilityIterator to average both FLOAT_DATA and another DATA column is extremely experimental."
1273 0 : << LogIO::POST;
1274 :
1275 0 : ArrayColumn<Complex> *outCmplxCols = new ArrayColumn<Complex>[nCmplx];
1276 0 : getDataColMap(msc_p, outCmplxCols, nCmplx, cmplxColLabels);
1277 :
1278 : // We may need to watch for chunks (timebins) that should be split because of
1279 : // changes in scan, etc. (CAS-2401). The old split way would have
1280 : // temporarily shortened timeBin, but vi.setInterval() does not work without
1281 : // calling vi.originChunks(), so that approach does not work with
1282 : // VisibilityIterator. Instead, get VisibilityIterator's sort (which also
1283 : // controls how the chunks are split) to do the work.
1284 :
1285 : // Already separated by the chunking.
1286 : //const Bool watch_array(!combine_p.contains("arr")); // Pirate talk for "array".
1287 :
1288 0 : const Bool watch_scan(!combine_p.contains("scan"));
1289 0 : const Bool watch_state(!combine_p.contains("state"));
1290 0 : const Bool watch_obs(!combine_p.contains("obs"));
1291 0 : uInt n_cols_to_watch = 4; // At least.
1292 :
1293 0 : if(watch_scan)
1294 0 : ++n_cols_to_watch;
1295 0 : if(watch_state)
1296 0 : ++n_cols_to_watch;
1297 0 : if(watch_obs)
1298 0 : ++n_cols_to_watch;
1299 :
1300 0 : Block<Int> sort(n_cols_to_watch);
1301 0 : uInt colnum = 1;
1302 :
1303 0 : sort[0] = MS::ARRAY_ID;
1304 0 : if(watch_scan){
1305 0 : sort[colnum] = MS::SCAN_NUMBER;
1306 0 : ++colnum;
1307 : }
1308 0 : if(watch_state){
1309 0 : sort[colnum] = MS::STATE_ID;
1310 0 : ++colnum;
1311 : }
1312 0 : sort[colnum] = MS::FIELD_ID;
1313 0 : ++colnum;
1314 0 : sort[colnum] = MS::DATA_DESC_ID;
1315 0 : ++colnum;
1316 0 : sort[colnum] = MS::TIME;
1317 0 : ++colnum;
1318 0 : if(watch_obs)
1319 0 : sort[colnum] = MS::OBSERVATION_ID;
1320 :
1321 : // MSIter tends to produce output INTERVALs that are longer than the
1322 : // requested interval length, by ~0.5 input integrations for a random
1323 : // timeBin_p. Giving it timeBin_p - 0.5 * interval[0] removes the bias and
1324 : // brings it almost in line with binTimes() (which uses -0.5 *
1325 : // interval[bin_start]).
1326 : //
1327 : // late April 2011: MSIter removed the bias, which threw off the correction.
1328 : //
1329 0 : ROVisibilityIterator vi(mssel_p, sort,
1330 : //timeBin_p - 0.5 * mscIn_p->interval()(0));
1331 0 : timeBin_p);
1332 : //vi.slurp();
1333 : //cerr << "Finished slurping." << endl;
1334 :
1335 0 : const Bool doSpWeight = vi.existsWeightSpectrum();
1336 :
1337 : //os << LogIO::NORMAL2 << "outNrow = " << msOut_p.nrow() << LogIO::POST;
1338 0 : uInt rowsdone = 0; // Output rows, used for the RefRows.
1339 :
1340 0 : uInt ninrows = mssel_p.nrow();
1341 : ProgressMeter meter(0.0, ninrows * 1.0, "partition", "rows averaged", "", "",
1342 0 : true, 1);
1343 0 : uInt inrowsdone = 0; // only for the meter.
1344 :
1345 0 : VisChunkAverager vca(dataColNames, doSpWeight);
1346 :
1347 : // Iterate through the chunks. A timebin will have multiple chunks if it has
1348 : // > 1 arrays, fields, or ddids.
1349 0 : for(vi.originChunks(); vi.moreChunks(); vi.nextChunk()){
1350 0 : vca.reset(); // Should be done at the start of each chunk.
1351 :
1352 0 : inrowsdone += vi.nRowChunk();
1353 :
1354 : // Fill and time average vi's current chunk.
1355 0 : VisBuffer& avb(vca.average(vi));
1356 0 : uInt rowsnow = avb.nRow();
1357 :
1358 0 : if(rowsnow > 0){
1359 0 : RefRows rowstoadd(rowsdone, rowsdone + rowsnow - 1);
1360 :
1361 : // msOut_p.addRow(rowsnow, true);
1362 0 : msOut_p.addRow(rowsnow); // Try it without initialization.
1363 :
1364 : // relabelIDs();
1365 :
1366 : // avb.freqAveCubes(); // Watch out, weight must currently be handled separately.
1367 :
1368 : // // Fill in the nonaveraging values from slotv0.
1369 0 : msc_p->antenna1().putColumnCells(rowstoadd, avb.antenna1());
1370 0 : msc_p->antenna2().putColumnCells(rowstoadd, avb.antenna2());
1371 :
1372 0 : Vector<Int> arrID(rowsnow);
1373 0 : arrID.set(avb.arrayId());
1374 0 : msc_p->arrayId().putColumnCells(rowstoadd, arrID);
1375 :
1376 : // outCmplxCols determines whether the input column is output to DATA or not.
1377 0 : for(uInt datacol = 0; datacol < nCmplx; ++datacol){
1378 0 : if(dataColNames[datacol] == MS::DATA)
1379 0 : outCmplxCols[datacol].putColumnCells(rowstoadd, avb.visCube());
1380 0 : else if(dataColNames[datacol] == MS::MODEL_DATA)
1381 0 : outCmplxCols[datacol].putColumnCells(rowstoadd, avb.modelVisCube());
1382 0 : else if(dataColNames[datacol] == MS::CORRECTED_DATA)
1383 0 : outCmplxCols[datacol].putColumnCells(rowstoadd, avb.correctedVisCube());
1384 : }
1385 0 : if(doFloat)
1386 0 : msc_p->floatData().putColumnCells(rowstoadd, avb.floatDataCube());
1387 :
1388 0 : Vector<Int> ddID(rowsnow);
1389 0 : ddID.set(avb.dataDescriptionId());
1390 0 : msc_p->dataDescId().putColumnCells(rowstoadd, ddID);
1391 :
1392 0 : msc_p->exposure().putColumnCells(rowstoadd, avb.exposure());
1393 0 : msc_p->feed1().putColumnCells(rowstoadd, avb.feed1());
1394 0 : msc_p->feed2().putColumnCells(rowstoadd, avb.feed2());
1395 :
1396 0 : Vector<Int> fieldID(rowsnow);
1397 0 : fieldID.set(avb.fieldId());
1398 0 : msc_p->fieldId().putColumnCells(rowstoadd, fieldID);
1399 :
1400 0 : msc_p->flagRow().putColumnCells(rowstoadd, avb.flagRow());
1401 0 : msc_p->flag().putColumnCells(rowstoadd, avb.flagCube());
1402 0 : msc_p->interval().putColumnCells(rowstoadd, avb.timeInterval());
1403 :
1404 0 : msc_p->observationId().putColumnCells(rowstoadd, avb.observationId());
1405 0 : msc_p->processorId().putColumnCells(rowstoadd, avb.processorId());
1406 :
1407 0 : msc_p->scanNumber().putColumnCells(rowstoadd, avb.scan());
1408 0 : msc_p->sigma().putColumnCells(rowstoadd, avb.sigmaMat());
1409 :
1410 0 : msc_p->stateId().putColumnCells(rowstoadd, avb.stateId());
1411 :
1412 0 : msc_p->time().putColumnCells(rowstoadd, avb.time());
1413 0 : msc_p->timeCentroid().putColumnCells(rowstoadd, avb.timeCentroid());
1414 0 : msc_p->uvw().putColumnCells(rowstoadd, avb.uvwMat());
1415 0 : msc_p->weight().putColumnCells(rowstoadd, avb.weightMat());
1416 0 : if(doSpWeight)
1417 0 : msc_p->weightSpectrum().putColumnCells(rowstoadd,
1418 0 : avb.weightSpectrum());
1419 0 : rowsdone += rowsnow;
1420 0 : }
1421 0 : meter.update(inrowsdone);
1422 : } // End of for(vi.originChunks(); vi.moreChunks(); vi.nextChunk())
1423 0 : delete [] outCmplxCols;
1424 0 : os << LogIO::NORMAL << "Data binned." << LogIO::POST;
1425 :
1426 : //const ColumnDescSet& cds = mssel_p.tableDesc().columnDescSet();
1427 : //const ColumnDesc& cdesc = cds[MS::columnName(MS::DATA)];
1428 : //ROTiledStManAccessor tacc(mssel_p, cdesc.dataManagerGroup());
1429 : //tacc.showCacheStatistics(cerr); // A 99.x% hit rate is good. 0% is bad.
1430 :
1431 : os << LogIO::DEBUG1 // helpdesk ticket in from Oleg Smirnov (ODU-232630)
1432 : << "Post binning memory: "
1433 0 : << Memory::allocatedMemoryInBytes() / (1024.0 * 1024.0) << " MB"
1434 0 : << LogIO::POST;
1435 :
1436 0 : if(rowsdone < 1){
1437 : os << LogIO::WARN
1438 : << "No rows were written. Is all the selected input flagged?"
1439 0 : << LogIO::POST;
1440 0 : return false;
1441 : }
1442 0 : return true;
1443 0 : }
1444 :
1445 0 : void Partition::getDataColMap(MSMainColumns* msc, ArrayColumn<Complex>* mapper,
1446 : uInt ntok,
1447 : const Vector<MS::PredefinedColumns>& colEnums)
1448 : {
1449 : // Set up a map from dataColumn indices to ArrayColumns in the output.
1450 : // mapper has to be a pointer (gasp!), not a Vector, because
1451 : // Vector<ArrayColumn<Complex> > mapper(ntok) would implicitly call
1452 : // .resize(), which uses =, which is banned for ArrayColumn.
1453 :
1454 0 : if(SubMS::mustConvertToData(ntok, colEnums)){
1455 0 : mapper[0].reference(msc->data());
1456 : }
1457 : else{
1458 0 : for(uInt i = 0; i < ntok; ++i){
1459 0 : if(colEnums[i] == MS::CORRECTED_DATA)
1460 0 : mapper[i].reference(msc->correctedData());
1461 0 : else if(colEnums[i] == MS::MODEL_DATA)
1462 0 : mapper[i].reference(msc->modelData());
1463 0 : else if(colEnums[i] == MS::LAG_DATA)
1464 0 : mapper[i].reference(msc->lagData());
1465 : else // The output default !=
1466 0 : mapper[i].reference(msc->data()); // the input default.
1467 : }
1468 : }
1469 0 : }
1470 :
1471 : } //#End casa namespace
|