Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/parallel/parallel_data_helper.py: 77%
793 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1#!/usr/bin/env python
2import os
3import re
4import shutil
5import string
6import copy
7import time
8import subprocess
10from numpy.f2py.auxfuncs import throw_error
12from .parallel_task_helper import ParallelTaskHelper, JobData
13from .. import partitionhelper as ph
14from casatools import quanta, ms, msmetadata, mstransformer, table
15from casatasks import casalog
17_qa = quanta()
19# common function to use to get a dictionary item iterator
20def lociteritems(adict):
21 return adict.items()
23"""
24ParallelDataHelper is a class to process Multi-MS. It can process the MMS
25as input and as an output. When the input is an MMS, this class will create
26parallel jobs for each subMS and execute them using ParallelTaskHelper.
27When the purpose is to create an output MMS, this class will work the heuristics
28to partition the input MS based on the given separation axis and the transformations
29performed by the client task.
31There are two types of tasks that may use this class:
321) tasks that read an input MMS and create an output MMS with the same
33 structure of the input (similar partition axis). Ex: split2
352) tasks that do 1) and can also create an output MMS from an input MS.
36 These tasks have a parameter createmms with several sub-parameters.
37 Ex: partition and mstransform.
39In order to call ParallelDataHelper from a task, a few things are needed:
40See examples in task_mstransform, task_partition.py, task_split2 or task_hanningsmooth2.py.
42 * It is assumed that the client task has data selection parameters, or at least
43 spw and scan parameters.
45 * ParallelDataHelper will override the ParallelTaskHelper methods:
46 isParallelMS(), initialize(), generateJobs() and postExecution().
48 1) How to use ParallelDataHelper to process an input MMS (similar to split2)
49 and create an output MMS with the same parallel structure of the input.
51 from parallel.parallel_data_helper import ParallelDataHelper
53 # Initiate the helper class
54 pdh = ParallelDataHelper('taskname', locals())
56 # Validate input and output parameters
57 pdh.setupIO()
59 # To read and work with input MMS (the output will be an MMS too)
60 if pdh.isParallelMS() and keepmms==True:
62 # validate some parameters. In some cases the MMS needs to be
63 # treated as a monolithic MS. In these cases, use mstransform instead
64 pdh.validateInputParams()
66 # Get a cluster
67 pdh.setupCluster('taskname')
69 # run the jobs in parallel
70 try:
71 pdh.go()
72 except Exception as instance:
73 casalog.post('%s'%instance,'ERROR')
74 return False
76 return True
78 2) How to use ParallelDataHelper to create an output MMS (similar to partition)
80 from parallel.parallel_data_helper import ParallelDataHelper
82 # Initiate the helper class
83 pdh = ParallelDataHelper('taskname', locals())
85 # Validate input and output parameters
86 pdh.setupIO()
88 if createmms==True:
90 # Get a cluster
91 pdh.setupCluster('taskname')
93 try:
94 pdh.go()
95 except Exception as instance:
96 casalog.post('%s'%instance,'ERROR')
97 return False
99 return True
100"""
103class ParallelDataHelper(ParallelTaskHelper):
105 def __init__(self, thistask, args={}):
106 self.__args = dict(args)
108 self.__taskname = thistask
110 self.__selectionScanList = None
111 self.__selectionBaselineList = None
112 self.__ddistart = None
113 self._msTool = None
114 self._tbTool = None
116 if not 'spw' in self.__args:
117 self.__args['spw'] = ''
119 if not 'scan' in self.__args:
120 self.__args['scan'] = ''
122 self.__spwSelection = self.__args['spw']
123 self.__spwList = None
124 # __ddidict contains the names of the subMSs to consolidate.
125 # The keys are the ddistart and the values the subMSs names
126 self.__ddidict = {}
128 # Start parameter for DDI in main table of each sub-MS.
129 # It should be a counter of spw IDs starting at 0
130 if 'ddistart' in self.__args:
131 self.__ddistart = self.__args['ddistart']
133 def setTaskName(self, thistask=''):
134 self.__taskname = thistask
136 def setupIO(self):
137 """ Validate input and output parameters """
139 if isinstance(self.__args['vis'], str):
140 if not os.path.exists(self.__args['vis']):
141 raise IOError('Visibility data set not found - please verify the name.')
143 if isinstance(self.__args['outputvis'], str):
144 # only one output MS
145 if self.__args['outputvis'].isspace() or self.__args['outputvis'].__len__() == 0:
146 raise IOError('Please specify outputvis.')
148 elif os.path.exists(self.__args['outputvis']):
149 raise IOError("Output MS %s already exists - will not overwrite it."%self.__args['outputvis'])
151 flagversions = self.__args['outputvis']+".flagversions"
152 if os.path.exists(flagversions):
153 raise RuntimeError('The flagversions {} for the output MS already exists. Please'
154 ' delete it.'.format(flagversions))
156 return True
158 def validateInputParams(self):
159 """ This method should run before setting up the cluster to work all the
160 heuristics associated with the input MMS and the several
161 transformations that the task does.
162 The parameters are validated based on the client task.
163 This method must use the self.__args parameters from the local class.
165 This method will determine if the task can process the MMS in parallel
166 or not.
168 It returns a dictionary of the following:
169 retval{'status': True, 'axis':''} --> can run in parallel
170 retval{'status': False, 'axis':'value'} --> treat MMS as monolithic MS, set axis of output MMS
171 retval{'status': False, 'axis':''} --> treat MMS as monolithic MS, create output MS
173 the new axis, which can be: scan,spw or auto.
175 """
176 # Return dictionary
177 retval = {}
178 retval['status'] = True
179 retval['axis'] = ''
181 # Get the separationaxis of input MMS.
182 sepaxis = ph.axisType(self.__args['vis'])
183 if sepaxis.isspace() or sepaxis.__len__() == 0:
184 sepaxis = 'unknown'
185 elif sepaxis == 'scan,spw':
186 sepaxis = 'auto'
188 #Get list of subMSs in MMS
189 subMSList = ParallelTaskHelper.getReferencedMSs(self.__args['vis'])
191 if self.__taskname == "mstransform":
193 if (self.__args['combinespws'] == True or self.__args['nspw'] > 1) and \
194 (self.__args['timeaverage'] == False):
195 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw'])
196 # Get dictionary with spwids of all subMS in the MMS
197 spwdict = ph.getScanSpwSummary(subMSList)
198 # For each subMS, check if it has the spw selection
199 for subms in subMSList:
200 subms_spwids = ph.getSubMSSpwIds(subms, spwdict)
201 slist = map(str,subms_spwids)
202 # Check if the subms contains all the selected spws
203 if not self.__isSpwContained(spwsel, slist):
204 casalog.post('Cannot combine or separate spws in parallel because the subMSs do not contain all the selected spws',\
205 'WARN')
206 # Set the new separation axis for the output
207 retval['status'] = False
208 retval['axis'] = 'scan'
209 break
211 elif (self.__args['timeaverage'] == True and self.__args['timespan'] == 'scan') and \
212 (self.__args['combinespws'] == False and self.__args['nspw'] == 1):
213 # Get the value of timebin as a float
214 timebin = self.__args['timebin']
215 tsec = _qa.quantity(timebin,'s')['value']
216 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan'])
217 # For each subms, check if scans length is <= timebin
218 for subms in subMSList:
219 if not self.__isScanContained(subms, scansel, tsec):
220 casalog.post('Cannot process MMS in parallel when timespan=\'scan\' because the subMSs do not contain all the selected scans',\
221 'WARN')
222 # Set the new separation axis for the output
223 retval['status'] = False
224 retval['axis'] = 'spw'
225 break
227 # Two transformations are requested.
228 elif (self.__args['combinespws'] == True or self.__args['nspw'] > 1) and \
229 (self.__args['timeaverage'] == True and self.__args['timespan'] == 'scan'):
230 # Check spws and scans in subMSs
231 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw'])
232 spwdict = ph.getScanSpwSummary(subMSList)
233 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan'])
234 timebin = self.__args['timebin']
235 tsec = _qa.quantity(timebin,'s')['value']
236 for subms in subMSList:
237 subms_spwids = ph.getSubMSSpwIds(subms, spwdict)
238 slist = map(str,subms_spwids)
239 if self.__isSpwContained(spwsel, slist):
240 if not self.__isScanContained(subms, scansel, tsec):
241 casalog.post('The subMSs of input MMS do not contain the necessary scans','WARN')
242 retval['status'] = False
243 retval['axis'] = ''
244 break
245 else:
246 casalog.post('The subMSs of input MMS do not contain the necessary spws','WARN')
247 retval['status'] = False
248 retval['axis'] = ''
249 break
252 elif self.__taskname == "split2" or self.__taskname == "split":
253 if (sepaxis != 'spw' and self.__args['combine'] == 'scan'):
254 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan'])
255 timebin = self.__args['timebin']
256 tsec = _qa.quantity(timebin,'s')['value']
257 for subms in subMSList:
258 if not self.__isScanContained(subms, scansel, tsec):
259 casalog.post('Cannot process MMS in parallel when combine=\'scan\' because the subMSs do not contain all the selected scans',\
260 'WARN')
261 casalog.post("Please set keepmms to False or use task mstransform in this case.",'ERROR')
262 retval['status'] = False
263 retval['axis'] = ''
264 break
266 elif self.__taskname == "cvel2" and sepaxis != 'scan':
267 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw'])
268 spwdict = ph.getScanSpwSummary(subMSList)
269 for subms in subMSList:
270 subms_spwids = ph.getSubMSSpwIds(subms, spwdict)
271 slist = map(str,subms_spwids)
272 # Check if the subms contains all the selected spws
273 if not self.__isSpwContained(spwsel, slist):
274 casalog.post('Cannot combine spws in parallel because the subMSs do not contain all the selected spws',\
275 'WARN')
276 casalog.post("Please set keepmms to False or use task mstransform in this case.",'ERROR')
277 # Set the new separation axis for the output
278 retval['status'] = False
279 retval['axis'] = ''
280 break
283 return retval
285 def __getSpwIds(self, msfile, spwsel):
286 """Get the spw IDs of the spw selection
287 Keyword arguments
288 msfile -- MS or MMS name
289 spwsel -- spw selection
291 It will remove the channels from the selection and return only the spw ids.
292 """
293 myspwsel = spwsel
294 if myspwsel.isspace() or myspwsel.__len__() == 0:
295 myspwsel = '*'
297 spwlist = []
298 msTool = ms( )
299 try:
300 seldict = msTool.msseltoindex(vis=msfile,spw=myspwsel)
301 except:
302 return spwlist
304 spwids = list(set(seldict['spw']))
305 spwlist = map(str,spwids)
307 del msTool
308 return spwlist
310 def __isSpwContained(self, spwlist, subms_spws):
311 """ Return True if the subMS contains the spw selection or False otherwise.
312 Keyword arguments:
313 spwlist -- list of selected spwids in MMS, e.g. ['0','1']. Do not include channels
314 subms_spws -- list of spwids in subMS
315 """
317 isSelected = False
319 # Check if the selected spws are in the subMS
320 if set(spwlist) <= set(subms_spws):
321 isSelected = True
323 return isSelected
325 def __getScanIds(self, msfile, scansel):
326 """ Get the scan IDs of the scan selection.
327 Keyword arguments:
328 msfile -- MS or MMS name
329 scansel -- scan selection
331 Returns a list of the scan IDs (list of strings) or [] in case of failure.
332 """
333 scanlist = []
334 if scansel.isspace() or scansel.__len__() == 0:
335 # Get all the scan ids
336 mymsmd = msmetadata( )
337 mymsmd.open(msfile)
338 scans = mymsmd.scannumbers()
339 mymsmd.close()
340 scanlist = map(str,scans)
341 else:
342 myms = ms()
343 try:
344 myms.open(msfile)
345 myms.msselect({'scan':scansel})
346 scans = myms.msselectedindices()['scan']
347 scanlist = map(str,scans)
348 except:
349 scanlist = []
350 finally:
351 myms.close()
353 return scanlist
355 def __isScanContained(self, subms, scanlist, tbin):
356 """ Check if subMS contains all the selected scans
357 and if the duration of the subMS scans is larger or
358 equal to the timebin.
360 Keyword arguments:
361 subms -- subMS name
362 scanlist -- list with selected scans for the MMS
363 tbin -- timebin as a Float
365 Returns True on success, False otherwise.
366 """
367 isContained = False
369 mymsmd = msmetadata( )
370 mymsmd.open(subms)
372 # Check if subms scans contain all selected scans
373 hasScans = False
374 s = mymsmd.scannumbers()
375 subms_scans = map(str, s)
376 if set(scanlist) <= set(subms_scans):
377 hasScans = True
379 if hasScans:
380 t = mymsmd.timesforscans(s)
381 mymsmd.close()
382 t_range = t.max() - t.min()
384 if t_range >= tbin:
385 isContained = True
387 return isContained
389 def validateOutputParams(self):
390 """ This method should run before setting up the cluster to work all the
391 heuristics associated with the separationaxis and the several
392 transformations that the task does.
393 This method must use the local class self.__args parameters
394 """
396 # success
397 retval = 1
398 if not 'separationaxis' in self.__args:
399 return retval
401 else:
402 sepaxis = self.__args['separationaxis']
404 # Task mstransform
405 if self.__taskname == "mstransform":
406 if sepaxis != 'scan' and (self.__args['combinespws'] == True or self.__args['nspw'] > 1):
407 casalog.post('Cannot partition MS per spw or auto when combinespws = True or nspw > 1', 'WARN')
408 retval = 0
410 elif sepaxis != 'spw' and self.__args['timespan'] == 'scan':
411 casalog.post('Time averaging across scans may lead to wrong results when separation axis is not spw', 'WARN')
413 return retval
415 @staticmethod
416 def isMMSAndNotServer(vis):
417 """
418 Is vis a Multi-MS, and I am not an MPI server?
419 The return value is used to know if sub-MS (sub-task) commands should be dispatched
420 to servers in parallel (MPI) mode.
422 :param vis: an MS
423 """
424 # This checks the "NotServer" condition. If I'm a server, no need to check more
425 # if MPIEnvironment.is_mpi_enabled and not MPIEnvironment.is_mpi_client:
426 if ParallelTaskHelper.isMPIEnabled() and not ParallelTaskHelper.isMPIClient():
427 return False
429 # Note: using the ParalleTaskHelper version which honors the __bypass_parallel trick
430 return ParallelTaskHelper.isParallelMS(vis)
432 @staticmethod
433 def isParallelMS(vis):
434 """ This method will read the value of SubType in table.info
435 of the Multi-MS or MS.
436 NOTE: this method is duplicated in in parallel_task_helper, with the addition of
437 a check on "ParallelTaskHelper.__bypass_parallel_processing".
439 Keyword arguments:
440 vis -- name of MS or Multi-MS
442 It returns True if SubType is CONCATENATED, False otherwise.
443 This method overrides the one from ParallelTaskHelper.
444 """
446 msTool = ms( )
447 if not msTool.open(vis):
448 raise ValueError("Unable to open MS %s," % vis)
449 rtnVal = msTool.ismultims() and \
450 isinstance(msTool.getreferencedtables(), list)
452 msTool.close()
453 return rtnVal
455 def override__args(self,arg,value):
456 """ Override a parameter value in ParallelDataHelper arguments
458 Keyword arguments:
459 arg -- name of the parameter
460 value -- value of the parameter
462 It is usually used for the outputvis or createmms parameters.
463 """
464 self.__args[arg] = value
466 def setupCluster(self, thistask=''):
467 """ Get a cluster
469 Keyword argument:
470 thistask -- the task calling this class
472 ParallelTaskHelper will populate its self._arg dictionary with
473 a copy of the parameters of the calling task, which have been
474 set in self.__args.
475 """
477 # It needs to use the updated list of parameters!!!
478 ParallelTaskHelper.__init__(self, task_name=thistask, args=self.__args)
480 def setupParameters(self, **pars):
481 """ Create a dictionary with non-empty parameters
483 Keyword argument:
484 **pars -- a dictionary with key:value pairs
486 It will return a dictionary with only non-empty parameters.
487 """
489 seldict = {}
490 for k,v in pars.items():
491 if v != None and v != "":
492 seldict[k] = v
494 return seldict
496 def validateModelCol(self):
497 """ Add the realmodelcol parameter to the configuration
498 only for some values of datacolumn. Specific for mstransform.
499 This method must use the local class self.__args parameters.
500 """
502 ret = False
504 dc = self.__args['datacolumn'].upper()
505 if "MODEL" in dc or dc == 'ALL':
506 ret = True
508 return ret
510 def initialize(self):
511 """Initializes some parts of the cluster setup.
512 Add the full path for the input and output MS.
513 Creates the temporary directory to save each
514 parallel subMS. The directory is called
515 <outputvis>.data.
516 This method overrides the one from ParallelTaskHelper.
517 """
519 casalog.origin("ParallelDataHelper")
521 # self._arg is populated inside ParallelTaskHelper._init_()
522 self._arg['vis'] = os.path.abspath(self._arg['vis'])
523 # MPI setting
524 if self._mpi_cluster:
525 self._cluster.start_services()
527 if (self._arg['outputvis'] != ""):
528 self._arg['outputvis'] = os.path.abspath(self._arg['outputvis'])
530 outputPath, self.outputBase = os.path.split(self._arg['outputvis'])
531 try:
532 if self.outputBase[-1] == '.':
533 self.outputBase = self.outputBase[:self.outputBase.rindex('.')]
534 except ValueError:
535 # outputBase must not have a trailing .
536 pass
538 if self.outputBase == '.' or self.outputBase == './':
539 raise ValueError('Error dealing with outputvis')
541 # The subMS are first saved inside a temporary directory
542 self.dataDir = outputPath + '/' + self.outputBase+'.data'
543 if os.path.exists(self.dataDir):
544 shutil.rmtree(self.dataDir)
546 os.mkdir(self.dataDir)
548 def generateJobs(self):
549 """ This is the method which generates all of the actual jobs to be done.
550 This method overrides the one in ParallelTaskHelper baseclass.
551 """
553 casalog.origin("ParallelDataHelper")
554 casalog.post("Analyzing MS for partitioning")
555 if ParallelDataHelper.isParallelMS(self._arg['vis']):
556 casalog.post("Input vis is a Multi-MS")
559 # Input MMS, processed in parallel; output is an MMS
560 # For tasks such as split2, hanningsmooth2
561 if ParallelDataHelper.isParallelMS(self._arg['vis']) and (not 'monolithic_processing' in self._arg):
562 self.__createNoSeparationCommand()
564 # For mstransform when processing input MMS in parallel
565 elif ParallelDataHelper.isParallelMS(self._arg['vis']) and self._arg['monolithic_processing'] == False:
566 self.__createNoSeparationCommand()
568 # For tasks that create an output MMS. In these cases
569 # input can be an MMS processed monolithically or an input MS
570 elif self._arg['createmms']:
571 self.__createPrimarySplitCommand()
573 return True
575 def __createNoSeparationCommand(self):
576 """ Add commands to be executed by the engines when input is an MMS.
577 This method overrides the following parameter:
578 self._arg['createmms']
579 """
581 submslist = ParallelTaskHelper.getReferencedMSs(self._arg['vis'])
582 if len(submslist) == 0:
583 raise ValueError('There are no subMSs in input vis')
585 tbTool = table( )
587 listOutputMS = []
589 subMs_idx = 0
590 for subMS in submslist:
592 # make sure the SORTED_TABLE keywords are disabled
593 tbTool.open(subMS, nomodify=False)
594 if 'SORTED_TABLE' in tbTool.keywordnames():
595 tobeDeleted = tbTool.getkeyword('SORTED_TABLE').split(' ')[1]
596 tbTool.removekeyword('SORTED_TABLE')
597 os.system('rm -rf '+tobeDeleted)
599 tbTool.close()
601 listOutputMS.append(self.dataDir+'/%s.%04d.ms' \
602 % (self.outputBase, subMs_idx))
603 subMs_idx += 1
605 # Override the original parameters
606 self.override_arg('outputvis',listOutputMS)
608 self._consolidateOutput = False
610 # Add to the list of jobs to execute
611 subMs_idx = 0
612 for subMS in submslist:
613 localArgs = copy.copy(self._arg)
614 localArgs['vis'] = subMS
615 for key in self._arguser:
616 localArgs[key] = self._arguser[key][subMs_idx]
618 if 'createmms' in self._arg:
619 self._arg['createmms'] = False
620 localArgs['createmms'] = False
622 subMs_idx += 1
623 if not self._mpi_cluster:
624 self._executionList.append(JobData(self._taskName, localArgs))
625 else:
626 self._executionList.append([self._taskName + '()',localArgs])
628 def __createPrimarySplitCommand(self):
629 """ This method overwrites the following parameter:
630 self._arg['separationaxis'] when running the monolithic case
631 """
633 if self._arg['createmms']:
635 if self._arg['separationaxis'].lower() == 'scan':
636 self.__createScanSeparationCommands()
637 elif self._arg['separationaxis'].lower() == 'spw':
638 self.__createSPWSeparationCommands()
639 elif self._arg['separationaxis'].lower() == 'baseline':
640 self.__createBaselineSeparationCommands()
641 elif self._arg['separationaxis'].lower() == 'auto':
642 self.__createBalancedSeparationCommands()
643 else:
644 # Use a default
645 self.__createDefaultSeparationCommands()
647 def __createScanSeparationCommands(self):
648 """ This method is to generate a list of commands to partition
649 the data based on scan.
650 """
652 scanList = self.__selectionScanList
653 if scanList is None:
654 self.__selectMS()
655 scanList = self.__getScanList()
657 # Make sure we have enough scans to create the needed number of
658 # subMSs. If not change the total expected.
659 numSubMS = self._arg['numsubms']
660 if isinstance(numSubMS,str) and numSubMS == 'auto':
661 # Create the best load balance based on the number of nodes
662 numSubMS = self.getNumberOfServers()
663 if numSubMS == None:
664 numSubMS = 8
665 numSubMS = min(len(scanList),numSubMS)
667 partitionedScans = self.__partition(scanList, numSubMS)
668 for output in range(numSubMS):
669 mmsCmd = copy.copy(self._arg)
670 mmsCmd['createmms'] = False
671 mmsCmd['scan']= ParallelTaskHelper.\
672 listToCasaString(partitionedScans[output])
673 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \
674 % (self.outputBase, output)
675 if not self._mpi_cluster:
676 self._executionList.append(JobData(self._taskName, mmsCmd))
677 else:
678 self._executionList.append([self._taskName + '()',mmsCmd])
680 def __createSPWSeparationCommands(self):
681 """ This method is to generate a list of commands to partition
682 the data based on spw.
683 """
685 # Get a unique list of selected spws
686 self.__selectMS()
687 spwList = self.__getSPWUniqueList()
688 numSubMS = self._arg['numsubms']
690 if isinstance(numSubMS,str) and numSubMS == 'auto':
691 # Create the best load balance based on the number of nodes
692 numSubMS = self.getNumberOfServers()
693 if numSubMS == None:
694 numSubMS = 8
695 numSubMS = min(len(spwList),numSubMS)
697 # Get a dictionary of the spws parted for each subMS, with IDs as strings
698 spwList = list(map(str,spwList))
699 partitionedSPWs1 = self.__partition1(spwList,numSubMS)
701 # Add the channel selections back to the spw expressions
702 newspwsel = self.__createSPWExpression(partitionedSPWs1)
704 # Validate the chanbin parameter
705 validbin = False
706 parname = self.getChanAvgParamName()
707 if self.validateChanBin():
708 if isinstance(self._arg[parname], list):
709 matched_chanbins = ParallelDataHelper.__get_matched_chanbins(
710 self.__args['spw'], spwList, self._arg[parname])
711 freqbinlist = self.__partition1(matched_chanbins, numSubMS)
712 validbin = True
714 # Calculate the ddistart for each engine. This will be used
715 # to calculate the DD IDs of the output main table of the subMSs
716 ddistartlist = self.__calculateDDIstart({}, partitionedSPWs1)
717 if (len(ddistartlist) != len(partitionedSPWs1)):
718 casalog.post('Error calculating the ddistart indices','SEVERE')
719 raise
721 for output in range(numSubMS):
722 mmsCmd = copy.copy(self._arg)
723 mmsCmd['createmms'] = False
724 if self.__selectionScanList is not None:
725 mmsCmd['scan'] = ParallelTaskHelper.\
726 listToCasaString(self.__selectionScanList)
727 mmsCmd['spw'] = newspwsel[output]
728 if validbin:
729 mmsCmd[parname] = freqbinlist[output]
731 self.__ddistart = ddistartlist[output]
732 mmsCmd['ddistart'] = self.__ddistart
733 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \
734 % (self.outputBase, output)
736 # Dictionary for the spw/ddi consolidation later
737 self.__ddidict[self.__ddistart] = self.dataDir+'/%s.%04d.ms' \
738 % (self.outputBase, output)
740 if not self._mpi_cluster:
741 self._executionList.append(JobData(self._taskName, mmsCmd))
742 else:
743 self._executionList.append([self._taskName + '()',mmsCmd])
745 @staticmethod
746 def __get_matched_chanbins(task_input_spws, spw_list, task_chanbin):
747 """
748 Produces a reordered list of per-SPW chanbin/width values, so that
749 it matches the SPW list after groing through several
750 potentially re-ordering done operations in this helper
751 (for example __createSPWExpression()/__getSPWUniqueList().
752 The reordered chanbin list is safe to use in functions
753 like __partition1().
755 :param task_input_spws: list of SPWs as given in the task input
756 :param spw_list: the list of SPWs as (potentially) reordered in this helper.
757 For example the output from __getSPWUniqueList()
758 :param task_chanbin: list of chanbin/width as given to the task input parameter
760 :returns: the task_chanbin list reordered to match how the
761 original (task input) list of SPWs has been reordered in
762 this helper into 'spw_list'
763 """
764 param_spw_orig = task_input_spws.split(',')
765 param_spw_spws = [item.partition(':')[0] for item in param_spw_orig]
766 spw_to_idx = {}
767 for spw in spw_list:
768 spw_to_idx[spw] = spw_list.index(str(spw))
770 spw_matched_chanbins = [None] * len(task_chanbin)
771 for freq, spw in zip(task_chanbin, param_spw_spws):
772 spw_idx = spw_to_idx[spw]
773 spw_matched_chanbins[spw_idx] = freq
775 return spw_matched_chanbins
777# TO BE DEPRECATED
778 def __createDefaultSeparationCommands(self):
779 """ This method is to generate a list of commands to partition
780 the data based on both scan/spw axes.
781 """
782 import math
784 casalog.post('Partition per scan/spw will ignore NULL combinations of these two parameters.')
786 # Separates in scan and spw axes
787 self.__selectMS()
789 # Get the list of spectral windows as strings
790 spwList = self.__getSPWUniqueList()
791 spwList = map(str,spwList)
793 # Check if we can just divide on SPW or if we need to do SPW and scan
794 numSubMS = self._arg['numsubms']
795 if isinstance(numSubMS,str) and numSubMS == 'auto':
796 # Create the best load balance based on the number of nodes
797 numSubMS = self.getNumberOfServers()
798 if numSubMS == None:
799 numSubMS = 8
800 numSpwPartitions = min(len(spwList),numSubMS)
801 numScanPartitions = int(math.ceil(numSubMS/float(numSpwPartitions)))
803 if numScanPartitions > 1:
804 # Check that the scanlist is not null
805 scanList = self.__selectionScanList
806 if scanList is None:
807 scanList = self.__getScanList()
809 # Check that the number of scans is enough for the partitions
810 if len(scanList) < numScanPartitions:
811 numScanPartitions = len(scanList)
812 else:
813 scanList = None
815 partitionedSpws = self.__partition1(spwList,numSpwPartitions)
816 partitionedScans = self.__partition(scanList,numScanPartitions)
818 # The same list but as a dictionary
819 str_partitionedScans = self.__partition1(scanList,numScanPartitions)
821 # Validate the chanbin parameter
822 validbin = False
823 parname = self.getChanAvgParamName()
824 if self.validateChanBin():
825 if isinstance(self._arg[parname],list):
826 freqbinlist = self.__partition1(self._arg[parname],numSpwPartitions)
827 validbin = True
829 # Add the channel selections back to the spw expressions
830 newspwsel = self.__createSPWExpression(partitionedSpws)
832 # Calculate the ddistart for the subMSs (for each engine)
833 ddistartlist = self.__calculateDDIstart(str_partitionedScans, partitionedSpws)
835 if (len(ddistartlist) != len(range(numSpwPartitions*numScanPartitions))):
836 casalog.post('Error calculating ddistart for the engines', 'SEVERE')
837 raise
839 # Set the first DD ID for the sub-table consolidation
840 ddi0 = ddistartlist[0]
841 self.__ddistart = 0
843 # index that composes the subms names (0000, 0001, etc.)
844 sindex = 0
845 for output in range(numSpwPartitions*numScanPartitions):
847 # Avoid the NULL MS selections by verifying that the
848 # combination scan-spw exist.
849 scansellist = map(str, partitionedScans[output%numScanPartitions])
850 selscan = ''
851 for ss in scansellist:
852 selscan = selscan + ',' + ss
853 selscan = selscan.lstrip(',')
854 if not self.__scanspwSelection(selscan,
855 str(newspwsel[output/numScanPartitions])):
856 continue
858 # The first valid subMS must have DDI 0
859 if sindex == 0:
860 self.__ddidict[0] = self.dataDir+'/%s.%04d.ms'%\
861 (self.outputBase, sindex)
862 else:
863 self.__ddistart = ddistartlist[output]
865 if self.__ddistart != ddi0:
866 ddi0 = ddistartlist[output]
867 # Dictionary for sub-table consolidation
868 self.__ddidict[self.__ddistart] = self.dataDir+'/%s.%04d.ms'% \
869 (self.outputBase, sindex)
871 mmsCmd = copy.copy(self._arg)
872 mmsCmd['createmms'] = False
873 mmsCmd['scan'] = ParallelTaskHelper.listToCasaString \
874 (partitionedScans[output%numScanPartitions])
876 mmsCmd['spw'] = newspwsel[output/numScanPartitions]
877 if validbin:
878 mmsCmd[parname] = freqbinlist[output/numScanPartitions]
879 mmsCmd['ddistart'] = self.__ddistart
880 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \
881 % (self.outputBase, sindex)
883 if not self._mpi_cluster:
884 self._executionList.append(JobData(self._taskName, mmsCmd))
885 else:
886 self._executionList.append([self._taskName + '()',mmsCmd])
888 sindex += 1 # index of subMS name
890 def __createBalancedSeparationCommands(self):
891 """ Generate a list of partition commands based on table language
892 queries that distribute the scan/spw pairs among subMSs to
893 balance the load along field, spw and scan axes
894 """
896 casalog.post('Automatically distribute the scan/spw pairs to balance the load along field, spw and scan axes')
898 # Get parameters for the partition map function
899 msfilename = self._arg['vis']
900 numSubMS = self._arg['numsubms']
901 if isinstance(numSubMS,str) and numSubMS == 'auto':
902 # Create the best load balance based on the number of nodes
903 numSubMS = self.getNumberOfServers()
904 if numSubMS == None:
905 numSubMS = 8
907 selection = self.__getSelectionFilter()
909 # Get partition map
910 partitionMap = ph.getPartitionMap(msfilename, numSubMS, selection, axis=['field','spw','scan'],plotMode=0)
912 # Iterate over list of subMSs
913 for subms in partitionMap:
915 mmsCmd = copy.deepcopy(self._arg)
916 mmsCmd['createmms'] = False
917 mmsCmd['taql'] = partitionMap[subms]['taql']
918 mmsCmd['outputvis'] = self.dataDir + '/%s.%04d.ms' % (self.outputBase, subms)
920 if not self._mpi_cluster:
921 self._executionList.append(JobData(self._taskName, mmsCmd))
922 else:
923 self._executionList.append([self._taskName + '()',mmsCmd])
925 def __createBaselineSeparationCommands(self):
926 """ This method is to generate a list of commands to partition
927 the data based on baseline.
928 """
930 # Get the available baselines in MS (it will take selections into account)
931 baselineList = self.__getBaselineList()
933 # Make sure we have enough baselines to create the needed number of
934 # subMSs. If not change the total expected.
935 numSubMS = self._arg['numsubms']
936 if isinstance(numSubMS,str) and numSubMS == 'auto':
937 # Create the best load balance based on the number of nodes
938 numSubMS = self.getNumberOfServers()
939 if numSubMS == None:
940 numSubMS = 8
942 numSubMS = min(len(baselineList),numSubMS)
944 # Create a map of the baselines to distribute in each subMS
945 # Example of baselinePartitions
946 # {0: [[0, 0]], 1: [[0, 1], [0, 2]], 2: [[0, 3]], 3: [[1, 1]], 4: [[1, 2]]}
947 baselinePartitions = self.__partition1(baselineList, numSubMS)
949 # Use the above list of baselines to construct a TaQL expression for each subMS
950 submsBaselineMap = {}
951 for subms in baselinePartitions.keys():
952 submsBaselineMap[subms] = {}
953 mytaql = []
954 submsPair = baselinePartitions[subms]
955 ant1ant2 = []
956 for idx in range(len(submsPair)):
957 ant1ant2 = submsPair[idx]
958 if type(ant1ant2) == list:
959 ant1 = ant1ant2[0]
960 ant2 = ant1ant2[1]
961 mytaql.append(('(ANTENNA1==%i && (ANTENNA2 IN %i))') % (ant1, ant2))
963 mytaql = ' OR '.join(mytaql)
964 submsBaselineMap[subms]['taql'] = mytaql
966 # Create the commands for each SubMS (each engine)
967 for output in range(numSubMS):
968 mmsCmd = copy.copy(self._arg)
969 mmsCmd['createmms'] = False
970 mmsCmd['taql'] = submsBaselineMap[output]['taql']
972 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \
973 % (self.outputBase, output)
975 if not self._mpi_cluster:
976 self._executionList.append(JobData(self._taskName, mmsCmd))
977 else:
978 self._executionList.append([self._taskName + '()',mmsCmd])
981 def __scanspwSelection(self, scan, spw):
982 """ Return True if the selection is True or False otherwise. """
984 isSelected = False
985 mysel = {}
986 mysel['scan'] = scan
987 mysel['spw'] = spw
989 if self._msTool is None:
990 # Open up the msTool
991 self._msTool = ms( )
992 self._msTool.open(self._arg['vis'])
993 else:
994 self._msTool.reset()
996 try:
997 isSelected = self._msTool.msselect(mysel)
998 except:
999 isSelected = False
1000 casalog.post('Ignoring NULL combination of scan=%s and spw=%s'%
1001 (scan, spw), 'DEBUG1')
1003 return isSelected
1005 def __calculateDDIstart(self, partedscans, partedspws):
1006 """ Calculate the list of DDI values for each partition (each engine).
1008 Keyword arguments:
1009 partedscans --- dictionary of parted scans as returned from self.__partition1()
1010 partedspws --- dictionary of parted spws as returned from self.__partition1()
1012 It returns a list of ddistart values with the same size of the number of subMSs.
1013 """
1015 # Example of partedspws:
1016 # create 2 subMss with spw=0,1,2 and spw=3
1017 # partedSPWs = {0:['0','1','2'],1:['3']}
1018 #
1019 # create 3 subMSs with spw=0,1,2 spw=3 and spw=4,5
1020 # partedSPWs = {0:['0','1','2'],1:['3'],2:['4','5']}
1022 hasscans = True
1023 if len(partedscans) == 0:
1024 scans = ''
1025 hasscans = False
1027 # It needs to take the correlation selection into account
1028 corr_sel = self._arg['correlation']
1029 ddistartList = []
1031 # scan+spw separation axis
1032 if hasscans:
1033 count = 0
1034 for k,spws in lociteritems(partedspws):
1035 for ks,scans in lociteritems(partedscans):
1036 if self._msTool is None:
1037 self._msTool = ms( )
1038 self._msTool.open(self._arg['vis'],nomodify=False)
1039 else:
1040 self._msTool.reset()
1042 try:
1043 # The dictionary with selected indices
1044 seldict = self._msTool.msseltoindex(vis=self._arg['vis'],scan=scans,spw=spws,polarization=corr_sel)
1045 except:
1046 self._msTool.close()
1047 continue
1049 # Get the selected DD IDs
1050 ddis = seldict['dd'].tolist()
1051 ddsize = ddis.__len__()
1052 if count == 0:
1053 ddistart = 0
1055 # Create a ddistart list
1056 ddistartList.append(ddistart)
1057 ddistart = ddistart + ddsize
1058 count = count + 1
1060 # spw separation axis
1061 else:
1062 count = 0
1063 for k,spws in lociteritems(partedspws):
1064 if self._msTool is None:
1065 self._msTool = ms( )
1066 self._msTool.open(self._arg['vis'],nomodify=False)
1067 else:
1068 self._msTool.reset()
1070 try:
1071 # The dictionary with selected indices
1072 seldict = self._msTool.msseltoindex(vis=self._arg['vis'],scan=scans,spw=spws, polarization=corr_sel)
1073 except:
1074 self._msTool.reset()
1075 continue
1077 # Get the selected DD IDs
1078 ddis = seldict['dd'].tolist()
1079 ddsize = ddis.__len__()
1080 if count == 0:
1081 ddistart = 0
1083 # Create a ddistart list
1084 ddistartList.append(ddistart)
1085 ddistart = ddistart + ddsize
1086 count = count + 1
1088 return ddistartList
1090 def __selectMS(self):
1091 """ This method will open the MS and ensure whatever selection criteria
1092 have been requested are honored. If scanList is not None then it
1093 is used as the scan selection criteria.
1094 """
1096 if self._msTool is None:
1097 self._msTool = ms( )
1098 self._msTool.open(self._arg['vis'])
1099 else:
1100 self._msTool.reset()
1102 # It returns a dictionary if there was any selection otherwise None
1103 self.__selectionFilter = self.__getSelectionFilter()
1105 if self.__selectionFilter is not None:
1106 self._msTool.msselect(self.__selectionFilter)
1108 def __getScanList(self):
1109 """ This method returns the scan list from the current ms. Be careful
1110 about having selection already done when you call this.
1111 """
1113 if self._msTool is None:
1114 self.__selectMS()
1116 scanSummary = self._msTool.getscansummary()
1117 scanList = [int(scan) for scan in scanSummary]
1119 if len(scanList) == 0:
1120 raise ValueError("No Scans present in the created MS.")
1122 scanList.sort()
1123 return scanList
1125 def __getSPWUniqueList(self):
1126 """
1127 Returns a list of spectral windows from the current MS (after
1128 selection). The list is sorted by SPW ID. The list holds
1129 integer ID and the IDs are sorted as integers. Note other
1130 functions will map the ints to strings.
1131 Selection is applied via __selectMS().
1132 """
1134 if self._msTool is None:
1135 self.__selectMS()
1137 # Now get the list of SPWs in the selected MS
1138 ddInfo = self._msTool.getspectralwindowinfo()
1139 self.__spwList = [info['SpectralWindowId'] for info in ddInfo.values()]
1141 # I wonder why the concern about uniqueness. mstool.getspectralwindowinfo() should
1142 # not return duplicated SPW IDs...
1143 # Return a unique sorted list:
1144 sorted_spws = list(set(self.__spwList))
1146 # Effectively sort by spw ID (as ints) before distributing to sub-MSs
1147 sorted_spws.sort()
1149 return sorted_spws
1151 def __getBaselineList(self):
1152 """ This method returns the baseline list from the current MS. Be careful
1153 about having selection already done when you call this.
1154 """
1156 # cumulative baseline selections do not reflect on the msselectedindices()
1157 if self._msTool is None:
1158 self.__selectMS()
1161 # If there are any previous antenna selections, use it
1162 if self._arg['antenna'] != '':
1163 baselineSelection = {'baseline':self._arg['antenna']}
1164 try:
1165 self._msTool.msselect(baselineSelection, onlyparse=False)
1166 # IMPORTANT: msselectedindices() will always say there are auto-correlation
1167 # baselines, even when there aren't. In the MMS case, the SubMS creation will
1168 # issue a MSSelectionNullSelection and not be created.
1169 baselinelist = self._msTool.msselectedindices()['baselines']
1170 except:
1171 baselinelist = []
1172 else:
1173 md = msmetadata( )
1174 md.open(self._arg['vis'])
1175 baselines = md.baselines()
1176 md.close()
1177 import numpy as np
1178 baselinelist = np.vstack(np.where(np.triu(baselines))).T
1181 return baselinelist.tolist()
1183 def __getSelectionFilter(self):
1184 """ This method takes the list of specified selection criteria and
1185 puts them into a dictionary. There is a bit of name mangling necessary.
1186 The pairs are: (msselection syntax, mstransform task syntax).
1187 """
1189 selectionPairs = []
1190 selectionPairs.append(('field','field'))
1191 selectionPairs.append(('spw','spw'))
1192 selectionPairs.append(('polarization','correlation'))
1193 selectionPairs.append(('baseline','antenna'))
1194 selectionPairs.append(('time','timerange'))
1195 selectionPairs.append(('scan','scan'))
1196 selectionPairs.append(('uvdist','uvrange'))
1197 selectionPairs.append(('scanintent','intent'))
1198 selectionPairs.append(('observation','observation'))
1199 return self.__generateFilter(selectionPairs)
1201 def __generateFilter(self, selectionPairs):
1202 """It creates a dictionary of the non-empty selection parameters.
1204 Keyword argument:
1205 selectionPairs -- list with task parameter name = msselection parameter name
1206 as returned from __getSelectionFilter()
1208 It will look at the selection parameters in self_arg.
1209 """
1210 filter = None
1211 for (selSyntax, argSyntax) in selectionPairs:
1212 if argSyntax in self._arg and self._arg[argSyntax] != '':
1213 if filter is None:
1214 filter = {}
1215 filter[selSyntax] = self._arg[argSyntax]
1217 return filter
1219 def __partition(self, lst, n):
1220 """ This method will split the list lst into "n" almost equal parts
1221 if lst is none, then we assume an empty list.
1222 """
1224 if lst is None:
1225 lst = []
1227 division = len(lst)/float(n)
1229 return [ lst[int(round(division * i)):
1230 int(round(division * (i+1)))] for i in range(int(n))]
1232 def __partition1(self, lst, n):
1233 """ This method will split the list lst into "n" almost equal parts.
1234 if lst is None, then we assume an empty list.
1236 Keyword arguments:
1237 lst --- spw list
1238 n --- numsubms
1240 It returns a dictionary such as:
1241 given the selection spw='0,1:10~20,3,4,5'
1242 rdict = {0: ['0','1'], 1:['3','4','5']}
1243 """
1245 if lst is None:
1246 lst = []
1248 # Create a dictionary for the parted spws:
1249 rdict = {}
1250 division = len(lst)/float(n)
1251 for i in range(int(n)):
1252 part = lst[int(round(division * i)):int(round(division * (i+1)))]
1253 rdict[i] = part
1255 return rdict
1257 def __chanSelection(self, spwsel):
1258 """ Create a dictionary of channel selections.
1260 Keyword arguments:
1261 spwsel --- a string with spw selection
1263 It returns a dictionary such as:
1264 spwsel = '0,1:10~20'
1265 seldict = {0: {'channels': '', 'spw': '0'},
1266 1: {'channels': '10~20', 'spw': '1'}}
1267 """
1269 # Split to get each spw in a list
1270 if spwsel.__contains__(','):
1271 spwlist = spwsel.split(',')
1272 else:
1273 spwlist = spwsel.split(';')
1275 spwid=[]
1276 chanlist=[]
1277 # Split to create two lists, one with channels, the other with spwIDs
1278 for isel in spwlist:
1279 # Get tail, colon and head
1280 (s, c, ch) = isel.rpartition(":")
1281 # Remove any blanks
1282 s = s.strip(' ')
1283 c = c.strip(' ')
1284 ch = ch.strip(' ')
1285 # If no tail, there was no colon to split. In this case, add the spwID
1286 if s == "":
1287 spwid.append(ch)
1288 chanlist.append('')
1289 else:
1290 spwid.append(s)
1291 chanlist.append(ch)
1293 # Create a dictionary
1294 seldict = {}
1295 for ns in range(len(spwid)):
1296 sel = {}
1297 sel['spw'] = spwid[ns]
1298 sel['channels'] = chanlist[ns]
1299 seldict[ns] = sel
1302 return seldict
1304 def __createSPWExpression(self, partdict):
1305 """ Creates the final spw expression that will be sent to the engines.
1306 This adds back the channel selections to their spw counterparts.
1308 Keyword arguments:
1309 partdict --- dictionary from __partition1, such as:
1311 Ex: partdict = {0: ['0','1'], 1:['3','4','5']}
1312 when selection is spw = '0,1:10~20,3,4,5'
1313 and effective number of subMSs is 2.
1314 """
1316 # Create a dictionary of the spw/channel selections
1317 # Ex: seldict = {0: {'channels': '', 'spw': '0'},
1318 # 1: {'channels': '10~20', 'spw': '1'}}
1319 seldict = self.__chanSelection(self.__spwSelection)
1321 newdict = copy.copy(partdict)
1323 # Match the spwId of partdict with those from seldict
1324 # For the matches that contain channel selection in seldict,
1325 # Add them to the spwID string in partdict
1326 for keys,vals in seldict.items():
1327 for k,v in partdict.items():
1328 for i in range(len(v)):
1329# if v[i] == seldict[keys]['spw'] and seldict[keys]['channels'] != '':
1330# if v[i] == vals['spw'] and vals['channels'] != '':
1331 # matches, now edit pardict
1332 if v[i] == vals['spw']:
1333 if vals['channels'] != '':
1334 spwexpr = vals['spw'] + ':' + vals['channels']
1335 else:
1336 # spwexpr = seldict[keys]['spw'] + ':' + seldict[keys]['channels']
1337 spwexpr = vals['spw']
1338 newdict[k][i] = spwexpr
1340 # We now have a new dictionary of the form:
1341 # newdict = {0: ['0', '1:10~20'], 1: ['3', '4','5']}
1342 # We want it to be:
1343 # newdict = {0: "0,1:10~20",1: "3, 4,5"}
1345 # Add a comma separator for each expression making
1346 # a single string for each key
1347 for k,v in newdict.items():
1348 spwstr = ""
1349 for s in range(len(v)):
1350 spwstr = spwstr + v[s] + ','
1351 newdict[k] = spwstr.rstrip(',')
1353 casalog.post('Dictionary of spw expressions is: ','DEBUG')
1354 casalog.post ('%s'%newdict,'DEBUG')
1356 return newdict
1358 def getChanAvgParamName(self):
1359 """ Get the channel average bin parameter name.
1360 It will return a string with the parameter name, based on
1361 the calling task.
1362 """
1364 casalog.origin("ParallelDataHelper")
1366 if self.__taskname == None:
1367 return None
1369 if self.__taskname == 'mstransform':
1370 return 'chanbin'
1371 elif self.__taskname == 'split' or self.__taskname == 'split2':
1372 return 'width'
1374 return None
1376 def validateChanBin(self):
1377 """ Check if channel average bin parameter has the same
1378 size of the spw selection.
1380 Returns True if parameter is valid or False otherwise.
1381 This method must use the local class self.__args parameters.
1382 TBD: make it a static method
1383 """
1385 casalog.origin("ParallelDataHelper")
1387 retval = True
1389 # Get the parameter name, which depends on the task calling this class
1390 parname = self.getChanAvgParamName()
1391 casalog.post('Channel average parameter is called %s'%parname,'DEBUG1')
1392 if parname == None:
1393 retval = False
1395 elif parname in self.__args:
1396 fblist = self.__args[parname]
1397 if isinstance(fblist,list):
1399 if len(fblist) > 1:
1400 if self.__spwList == None:
1401 msTool = ms( )
1402 msTool.open(self.__args['vis'])
1403 spwsel = self.__args['spw']
1404 msTool.msselect({'spw':spwsel})
1405 ddInfo = msTool.getspectralwindowinfo()
1406 self.__spwList = [info['SpectralWindowId'] for info in ddInfo.values()]
1407 msTool.close()
1409 if len(self.__spwList) != len(fblist):
1410 retval = False
1411 raise ValueError('Number of %s is different from the number of spw' %parname)
1414 return retval
1416 def defaultRegridParams(self):
1417 """ Reset the default values of the regridms transformation parameters based on the mode.
1418 Specific for mstransform task.
1419 This method must use the local class self.__args parameters.
1420 TBD: make it a static method
1421 """
1423 casalog.origin("ParallelDataHelper")
1425 if self.__args['mode'] == 'channel' or self.__args['mode'] == 'channel_b':
1426 self.__args['start'] = str(self.__args['start'])
1427 self.__args['width'] = str(self.__args['width'])
1429 elif self.__args['mode'] == 'velocity':
1430 restfreq = self.__args['restfreq']
1431 if restfreq == "" or restfreq.isspace():
1432 raise ValueError("Parameter restfreq must be set when mode='velocity'")
1434 if self.__args['start'] == 0:
1435 self.__args['start'] = ''
1437 if self.__args['width'] == 1:
1438 self.__args['width'] = ''
1441 # Check if the parameter has valid velocity units
1442 if not self.__args['start'] == '':
1443 if (_qa.quantity(self.__args['start'])['unit'].find('m/s') < 0):
1444 raise TypeError('Parameter start does not have valid velocity units')
1446 if not self.__args['width'] == '':
1447 if (_qa.quantity(self.__args['width'])['unit'].find('m/s') < 0):
1448 raise TypeError('Parameter width does not have valid velocity units')
1450 elif self.__args['mode'] == 'frequency':
1451 if self.__args['start'] == 0:
1452 self.__args['start'] = ''
1453 if self.__args['width'] == 1:
1454 self.__args['width'] = ''
1456 # Check if the parameter has valid frequency units
1457 if not self.__args['start'] == '':
1458 if (_qa.quantity(self.__args['start'])['unit'].find('Hz') < 0):
1459 raise TypeError('Parameter start does not have valid frequency units')
1461 if not self.__args['width'] == '':
1462 if (_qa.quantity(self.__args['width'])['unit'].find('Hz') < 0):
1463 raise TypeError('Parameter width does not have valid frequency units')
1465 start = self.__args['start']
1466 width = self.__args['width']
1468 return start, width
1470 def postExecution(self):
1471 """ This method overrides the postExecution method of ParallelTaskHelper,
1472 in which case we probably need to generate the output reference MS.
1473 """
1475 casalog.origin("ParallelDataHelper")
1476 if self._msTool:
1477 self._msTool.close()
1479 # We created a data directory and many SubMSs,
1480 # now build the reference MS. The outputList is a
1481 # dictionary of the form:
1482 # {'path/outputvis.data/SUBMSS/outputvis.0000.ms':True,
1483 # 'path/outuputvis.data/SUBMSS/outputvis.0001.ms':False}
1484 outputList = {}
1486# if (ParallelTaskHelper.getBypassParallelProcessing()==1):
1487 if (self._cluster == None):
1488 # This is the list of output SubMSs
1489 outputList = self._sequential_return_list
1490 self._sequential_return_list = {}
1491 elif (self._cluster != None):
1492 command_response_list = self._cluster.get_command_response(self._command_request_id_list,True,True)
1493 # Format list in the form of vis dict
1494 for command_response in command_response_list:
1495 outvis = command_response['parameters']['outputvis']
1496 outputList[outvis] = command_response['successful']
1498 casalog.post('postExecution dict of commands and success values: {}'.format(
1499 outputList), 'DEBUG')
1501 # List of failed MSs. TBD
1502 nFailures = []
1504 subMSList = []
1506 nFailures = [v for v in outputList.values() if v == False]
1508 for subMS in outputList:
1509 # Only use the successful (as per command response fields) output MSs
1510 if outputList[subMS]:
1511 subMSList.append(subMS)
1513 subMSList.sort()
1515 if len(subMSList) == 0:
1516 casalog.post("Error: no subMSs were successfully created.", 'WARN')
1517 return False
1519 # When separationaxis='scan' there is no need to give ddistart.
1520 # The tool looks at the whole spw selection and
1521 # creates the indices from it. After the indices are worked out,
1522 # it applies MS selection. We do not need to consolidate either.
1524 # If axis is spw, give a list of the subMSs
1525 # that need to be consolidated. This list is pre-organized
1526 # inside the separation functions above.
1528 # Only when input is MS or MS-like and createmms=True
1529 # Only partition and mstransform have the createmms parameter
1530 if 'createmms' in self._arg and self._arg['createmms'] == True and self._arg['separationaxis'] == 'spw':
1531# if (self._arg['separationaxis'] == 'spw' or
1532# self._arg['separationaxis'] == 'auto'):
1533# if (self._arg['separationaxis'] == 'spw'):
1535 casalog.post('Consolidate the sub-tables')
1537 toUpdateList = list(self.__ddidict.values())
1539 toUpdateList.sort()
1540 casalog.post('List to consolidate %s'%toUpdateList,'DEBUG')
1542 # Consolidate the spw sub-tables to take channel selection
1543 # or averages into account.
1544 mtlocal1 = mstransformer()
1545 try:
1546 mtlocal1.mergespwtables(toUpdateList)
1547 mtlocal1.done()
1548 except Exception:
1549 mtlocal1.done()
1550 casalog.post('Cannot consolidate spw sub-tables in MMS','SEVERE')
1551 return False
1553 if len(nFailures) > 0:
1554 casalog.post('%s subMSs failed to be created. This is not an error, if due to selection when creating a Multi-MS'%len(nFailures))
1555 # need to rename/re-index the subMSs
1556 newList = copy.deepcopy(subMSList)
1557 idx = 0
1558 for subms in newList:
1559 suffix = re.findall(r".\d{4}.ms",subms)
1560# newms = subms.rpartition(suffix[-1])[0]
1561 newms = subms[:-len(suffix[-1])]
1562 newms = newms+'.%04d.ms'%idx
1563 os.rename(subms,newms)
1564 newList[idx] = newms
1565 idx += 1
1568 if len(subMSList) == len(newList):
1569 subMSList = newList
1571 # Get the first subMS to be the reference when
1572 # copying the sub-tables to the other subMSs
1573 mastersubms = subMSList[0]
1575 # Get list of all subtables in a subms
1576 thesubtables = ph.getSubtables(mastersubms)
1578 # Remove the SOURCE and HISTORY tables, which will be the only copied.
1579 # All other sub-tables will be linked to first subms
1580 thesubtables.remove('SOURCE')
1581 thesubtables.remove('HISTORY')
1583 subtabs_to_omit = thesubtables
1585 # Parallel axis to write to table.info of MMS
1586 # By default take the one from the input MMS
1587 parallel_axis = ph.axisType(self.__args['vis'])
1588 if 'createmms' in self._arg and self._arg['createmms'] == True:
1589 parallel_axis = self._arg['separationaxis']
1591 if parallel_axis == 'auto' or parallel_axis == 'both':
1592 parallel_axis = 'scan,spw'
1594 # Copy sub-tables from first subMS to the others. The tables in
1595 # subtabs_to_omit are linked instead of copied.
1596 casalog.post("Finalizing MMS structure")
1597 ph.makeMMS(self._arg['outputvis'], subMSList,
1598 True, # copy subtables (will copy only the SOURCE and HISTORY tables)
1599 subtabs_to_omit, # omitting these
1600 parallel_axis
1601 )
1603 thesubmscontainingdir = os.path.dirname(subMSList[0].rstrip('/'))
1605 shutil.rmtree(thesubmscontainingdir)
1607 # Sanity check on the just created MMS
1608 # check for broken symlinks
1609 try:
1610 with open(os.devnull, 'w') as null:
1611 p = subprocess.Popen(['find', '-L', self._arg['outputvis'], '-type', 'l'],
1612 universal_newlines=True, stdout=subprocess.PIPE, stderr=null)
1613 o, e = p.communicate()
1614 if o:
1615 casalog.post('The new MMS contain broken symlinks. Please verify', 'SEVERE')
1616 casalog.post(o, 'SEVERE')
1617 return False
1618 except:
1619 pass
1621 return True