Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/parallel/parallel_data_helper.py: 8%

793 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-31 19:10 +0000

1#!/usr/bin/env python 

2import os 

3import re 

4import shutil 

5import string 

6import copy 

7import time 

8import subprocess 

9 

10from numpy.f2py.auxfuncs import throw_error 

11 

12from .parallel_task_helper import ParallelTaskHelper, JobData 

13from .. import partitionhelper as ph 

14from casatools import quanta, ms, msmetadata, mstransformer, table 

15from casatasks import casalog 

16 

17_qa = quanta() 

18 

19# common function to use to get a dictionary item iterator 

20def lociteritems(adict): 

21 return adict.items() 

22 

23""" 

24ParallelDataHelper is a class to process Multi-MS. It can process the MMS 

25as input and as an output. When the input is an MMS, this class will create 

26parallel jobs for each subMS and execute them using ParallelTaskHelper. 

27When the purpose is to create an output MMS, this class will work the heuristics 

28to partition the input MS based on the given separation axis and the transformations 

29performed by the client task. 

30 

31There are two types of tasks that may use this class: 

321) tasks that read an input MMS and create an output MMS with the same 

33 structure of the input (similar partition axis). Ex: split2 

34 

352) tasks that do 1) and can also create an output MMS from an input MS. 

36 These tasks have a parameter createmms with several sub-parameters. 

37 Ex: partition and mstransform. 

38 

39In order to call ParallelDataHelper from a task, a few things are needed: 

40See examples in task_mstransform, task_partition.py, task_split2 or task_hanningsmooth2.py. 

41 

42 * It is assumed that the client task has data selection parameters, or at least 

43 spw and scan parameters. 

44  

45 * ParallelDataHelper will override the ParallelTaskHelper methods: 

46 isParallelMS(), initialize(), generateJobs() and postExecution(). 

47  

48 1) How to use ParallelDataHelper to process an input MMS (similar to split2) 

49 and create an output MMS with the same parallel structure of the input. 

50  

51 from parallel.parallel_data_helper import ParallelDataHelper 

52  

53 # Initiate the helper class  

54 pdh = ParallelDataHelper('taskname', locals())  

55  

56 # Validate input and output parameters 

57 pdh.setupIO() 

58  

59 # To read and work with input MMS (the output will be an MMS too) 

60 if pdh.isParallelMS() and keepmms==True: 

61  

62 # validate some parameters. In some cases the MMS needs to be 

63 # treated as a monolithic MS. In these cases, use mstransform instead 

64 pdh.validateInputParams() 

65  

66 # Get a cluster 

67 pdh.setupCluster('taskname') 

68  

69 # run the jobs in parallel 

70 try: 

71 pdh.go() 

72 except Exception as instance: 

73 casalog.post('%s'%instance,'ERROR') 

74 return False 

75  

76 return True 

77 

78 2) How to use ParallelDataHelper to create an output MMS (similar to partition) 

79  

80 from parallel.parallel_data_helper import ParallelDataHelper 

81  

82 # Initiate the helper class  

83 pdh = ParallelDataHelper('taskname', locals())  

84  

85 # Validate input and output parameters 

86 pdh.setupIO() 

87  

88 if createmms==True:  

89  

90 # Get a cluster 

91 pdh.setupCluster('taskname') 

92  

93 try: 

94 pdh.go() 

95 except Exception as instance: 

96 casalog.post('%s'%instance,'ERROR') 

97 return False 

98  

99 return True 

100""" 

101 

102 

103class ParallelDataHelper(ParallelTaskHelper): 

104 

105 def __init__(self, thistask, args={}): 

106 self.__args = dict(args) 

107 

108 self.__taskname = thistask 

109 

110 self.__selectionScanList = None 

111 self.__selectionBaselineList = None 

112 self.__ddistart = None 

113 self._msTool = None 

114 self._tbTool = None 

115 

116 if not 'spw' in self.__args: 

117 self.__args['spw'] = '' 

118 

119 if not 'scan' in self.__args: 

120 self.__args['scan'] = '' 

121 

122 self.__spwSelection = self.__args['spw'] 

123 self.__spwList = None 

124 # __ddidict contains the names of the subMSs to consolidate. 

125 # The keys are the ddistart and the values the subMSs names 

126 self.__ddidict = {} 

127 

128 # Start parameter for DDI in main table of each sub-MS. 

129 # It should be a counter of spw IDs starting at 0 

130 if 'ddistart' in self.__args: 

131 self.__ddistart = self.__args['ddistart'] 

132 

133 def setTaskName(self, thistask=''): 

134 self.__taskname = thistask 

135 

136 def setupIO(self): 

137 """ Validate input and output parameters """ 

138 

139 if isinstance(self.__args['vis'], str): 

140 if not os.path.exists(self.__args['vis']): 

141 raise IOError('Visibility data set not found - please verify the name.') 

142 

143 if isinstance(self.__args['outputvis'], str): 

144 # only one output MS 

145 if self.__args['outputvis'].isspace() or self.__args['outputvis'].__len__() == 0: 

146 raise IOError('Please specify outputvis.') 

147 

148 elif os.path.exists(self.__args['outputvis']): 

149 raise IOError("Output MS %s already exists - will not overwrite it."%self.__args['outputvis']) 

150 

151 flagversions = self.__args['outputvis']+".flagversions" 

152 if os.path.exists(flagversions): 

153 raise RuntimeError('The flagversions {} for the output MS already exists. Please' 

154 ' delete it.'.format(flagversions)) 

155 

156 return True 

157 

158 def validateInputParams(self): 

159 """ This method should run before setting up the cluster to work all the 

160 heuristics associated with the input MMS and the several 

161 transformations that the task does. 

162 The parameters are validated based on the client task. 

163 This method must use the self.__args parameters from the local class. 

164  

165 This method will determine if the task can process the MMS in parallel 

166 or not. 

167  

168 It returns a dictionary of the following: 

169 retval{'status': True, 'axis':''} --> can run in parallel  

170 retval{'status': False, 'axis':'value'} --> treat MMS as monolithic MS, set axis of output MMS 

171 retval{'status': False, 'axis':''} --> treat MMS as monolithic MS, create output MS 

172  

173 the new axis, which can be: scan,spw or auto. 

174  

175 """ 

176 # Return dictionary 

177 retval = {} 

178 retval['status'] = True 

179 retval['axis'] = '' 

180 

181 # Get the separationaxis of input MMS.  

182 sepaxis = ph.axisType(self.__args['vis']) 

183 if sepaxis.isspace() or sepaxis.__len__() == 0: 

184 sepaxis = 'unknown' 

185 elif sepaxis == 'scan,spw': 

186 sepaxis = 'auto' 

187 

188 #Get list of subMSs in MMS 

189 subMSList = ParallelTaskHelper.getReferencedMSs(self.__args['vis']) 

190 

191 if self.__taskname == "mstransform": 

192 

193 if (self.__args['combinespws'] == True or self.__args['nspw'] > 1) and \ 

194 (self.__args['timeaverage'] == False): 

195 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw']) 

196 # Get dictionary with spwids of all subMS in the MMS 

197 spwdict = ph.getScanSpwSummary(subMSList) 

198 # For each subMS, check if it has the spw selection 

199 for subms in subMSList: 

200 subms_spwids = ph.getSubMSSpwIds(subms, spwdict) 

201 slist = map(str,subms_spwids) 

202 # Check if the subms contains all the selected spws 

203 if not self.__isSpwContained(spwsel, slist): 

204 casalog.post('Cannot combine or separate spws in parallel because the subMSs do not contain all the selected spws',\ 

205 'WARN') 

206 # Set the new separation axis for the output 

207 retval['status'] = False 

208 retval['axis'] = 'scan' 

209 break 

210 

211 elif (self.__args['timeaverage'] == True and self.__args['timespan'] == 'scan') and \ 

212 (self.__args['combinespws'] == False and self.__args['nspw'] == 1): 

213 # Get the value of timebin as a float 

214 timebin = self.__args['timebin'] 

215 tsec = _qa.quantity(timebin,'s')['value'] 

216 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan']) 

217 # For each subms, check if scans length is <= timebin 

218 for subms in subMSList: 

219 if not self.__isScanContained(subms, scansel, tsec): 

220 casalog.post('Cannot process MMS in parallel when timespan=\'scan\' because the subMSs do not contain all the selected scans',\ 

221 'WARN') 

222 # Set the new separation axis for the output 

223 retval['status'] = False 

224 retval['axis'] = 'spw' 

225 break 

226 

227 # Two transformations are requested. 

228 elif (self.__args['combinespws'] == True or self.__args['nspw'] > 1) and \ 

229 (self.__args['timeaverage'] == True and self.__args['timespan'] == 'scan'): 

230 # Check spws and scans in subMSs 

231 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw']) 

232 spwdict = ph.getScanSpwSummary(subMSList) 

233 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan']) 

234 timebin = self.__args['timebin'] 

235 tsec = _qa.quantity(timebin,'s')['value'] 

236 for subms in subMSList: 

237 subms_spwids = ph.getSubMSSpwIds(subms, spwdict) 

238 slist = map(str,subms_spwids) 

239 if self.__isSpwContained(spwsel, slist): 

240 if not self.__isScanContained(subms, scansel, tsec): 

241 casalog.post('The subMSs of input MMS do not contain the necessary scans','WARN') 

242 retval['status'] = False 

243 retval['axis'] = '' 

244 break 

245 else: 

246 casalog.post('The subMSs of input MMS do not contain the necessary spws','WARN') 

247 retval['status'] = False 

248 retval['axis'] = '' 

249 break 

250 

251 

252 elif self.__taskname == "split2" or self.__taskname == "split": 

253 if (sepaxis != 'spw' and self.__args['combine'] == 'scan'): 

254 scansel = self.__getScanIds(self.__args['vis'], self.__args['scan']) 

255 timebin = self.__args['timebin'] 

256 tsec = _qa.quantity(timebin,'s')['value'] 

257 for subms in subMSList: 

258 if not self.__isScanContained(subms, scansel, tsec): 

259 casalog.post('Cannot process MMS in parallel when combine=\'scan\' because the subMSs do not contain all the selected scans',\ 

260 'WARN') 

261 casalog.post("Please set keepmms to False or use task mstransform in this case.",'ERROR') 

262 retval['status'] = False 

263 retval['axis'] = '' 

264 break 

265 

266 elif self.__taskname == "cvel2" and sepaxis != 'scan': 

267 spwsel = self.__getSpwIds(self.__args['vis'], self.__args['spw']) 

268 spwdict = ph.getScanSpwSummary(subMSList) 

269 for subms in subMSList: 

270 subms_spwids = ph.getSubMSSpwIds(subms, spwdict) 

271 slist = map(str,subms_spwids) 

272 # Check if the subms contains all the selected spws 

273 if not self.__isSpwContained(spwsel, slist): 

274 casalog.post('Cannot combine spws in parallel because the subMSs do not contain all the selected spws',\ 

275 'WARN') 

276 casalog.post("Please set keepmms to False or use task mstransform in this case.",'ERROR') 

277 # Set the new separation axis for the output 

278 retval['status'] = False 

279 retval['axis'] = '' 

280 break 

281 

282 

283 return retval 

284 

285 def __getSpwIds(self, msfile, spwsel): 

286 """Get the spw IDs of the spw selection 

287 Keyword arguments 

288 msfile -- MS or MMS name 

289 spwsel -- spw selection 

290  

291 It will remove the channels from the selection and return only the spw ids. 

292 """ 

293 myspwsel = spwsel 

294 if myspwsel.isspace() or myspwsel.__len__() == 0: 

295 myspwsel = '*' 

296 

297 spwlist = [] 

298 msTool = ms( ) 

299 try: 

300 seldict = msTool.msseltoindex(vis=msfile,spw=myspwsel) 

301 except: 

302 return spwlist 

303 

304 spwids = list(set(seldict['spw'])) 

305 spwlist = map(str,spwids) 

306 

307 del msTool 

308 return spwlist 

309 

310 def __isSpwContained(self, spwlist, subms_spws): 

311 """ Return True if the subMS contains the spw selection or False otherwise.  

312 Keyword arguments: 

313 spwlist -- list of selected spwids in MMS, e.g. ['0','1']. Do not include channels 

314 subms_spws -- list of spwids in subMS 

315 """ 

316 

317 isSelected = False 

318 

319 # Check if the selected spws are in the subMS 

320 if set(spwlist) <= set(subms_spws): 

321 isSelected = True 

322 

323 return isSelected 

324 

325 def __getScanIds(self, msfile, scansel): 

326 """ Get the scan IDs of the scan selection. 

327 Keyword arguments: 

328 msfile -- MS or MMS name 

329 scansel -- scan selection 

330 

331 Returns a list of the scan IDs (list of strings) or [] in case of failure. 

332 """ 

333 scanlist = [] 

334 if scansel.isspace() or scansel.__len__() == 0: 

335 # Get all the scan ids 

336 mymsmd = msmetadata( ) 

337 mymsmd.open(msfile) 

338 scans = mymsmd.scannumbers() 

339 mymsmd.close() 

340 scanlist = map(str,scans) 

341 else: 

342 myms = ms() 

343 try: 

344 myms.open(msfile) 

345 myms.msselect({'scan':scansel}) 

346 scans = myms.msselectedindices()['scan'] 

347 scanlist = map(str,scans) 

348 except: 

349 scanlist = [] 

350 finally: 

351 myms.close() 

352 

353 return scanlist 

354 

355 def __isScanContained(self, subms, scanlist, tbin): 

356 """ Check if subMS contains all the selected scans 

357 and if the duration of the subMS scans is larger or  

358 equal to the timebin. 

359  

360 Keyword arguments: 

361 subms -- subMS name 

362 scanlist -- list with selected scans for the MMS 

363 tbin -- timebin as a Float 

364  

365 Returns True on success, False otherwise. 

366 """ 

367 isContained = False 

368 

369 mymsmd = msmetadata( ) 

370 mymsmd.open(subms) 

371 

372 # Check if subms scans contain all selected scans 

373 hasScans = False 

374 s = mymsmd.scannumbers() 

375 subms_scans = map(str, s) 

376 if set(scanlist) <= set(subms_scans): 

377 hasScans = True 

378 

379 if hasScans: 

380 t = mymsmd.timesforscans(s) 

381 mymsmd.close() 

382 t_range = t.max() - t.min() 

383 

384 if t_range >= tbin: 

385 isContained = True 

386 

387 return isContained 

388 

389 def validateOutputParams(self): 

390 """ This method should run before setting up the cluster to work all the 

391 heuristics associated with the separationaxis and the several 

392 transformations that the task does. 

393 This method must use the local class self.__args parameters 

394 """ 

395 

396 # success 

397 retval = 1 

398 if not 'separationaxis' in self.__args: 

399 return retval 

400 

401 else: 

402 sepaxis = self.__args['separationaxis'] 

403 

404 # Task mstransform 

405 if self.__taskname == "mstransform": 

406 if sepaxis != 'scan' and (self.__args['combinespws'] == True or self.__args['nspw'] > 1): 

407 casalog.post('Cannot partition MS per spw or auto when combinespws = True or nspw > 1', 'WARN') 

408 retval = 0 

409 

410 elif sepaxis != 'spw' and self.__args['timespan'] == 'scan': 

411 casalog.post('Time averaging across scans may lead to wrong results when separation axis is not spw', 'WARN') 

412 

413 return retval 

414 

415 @staticmethod 

416 def isMMSAndNotServer(vis): 

417 """ 

418 Is vis a Multi-MS, and I am not an MPI server? 

419 The return value is used to know if sub-MS (sub-task) commands should be dispatched 

420 to servers in parallel (MPI) mode. 

421 

422 :param vis: an MS 

423 """ 

424 # This checks the "NotServer" condition. If I'm a server, no need to check more 

425 # if MPIEnvironment.is_mpi_enabled and not MPIEnvironment.is_mpi_client: 

426 if ParallelTaskHelper.isMPIEnabled() and not ParallelTaskHelper.isMPIClient(): 

427 return False 

428 

429 # Note: using the ParalleTaskHelper version which honors the __bypass_parallel trick 

430 return ParallelTaskHelper.isParallelMS(vis) 

431 

432 @staticmethod 

433 def isParallelMS(vis): 

434 """ This method will read the value of SubType in table.info 

435 of the Multi-MS or MS.  

436 NOTE: this method is duplicated in in parallel_task_helper, with the addition of 

437 a check on "ParallelTaskHelper.__bypass_parallel_processing". 

438  

439 Keyword arguments: 

440 vis -- name of MS or Multi-MS 

441  

442 It returns True if SubType is CONCATENATED, False otherwise. 

443 This method overrides the one from ParallelTaskHelper. 

444 """ 

445 

446 msTool = ms( ) 

447 if not msTool.open(vis): 

448 raise ValueError("Unable to open MS %s," % vis) 

449 rtnVal = msTool.ismultims() and \ 

450 isinstance(msTool.getreferencedtables(), list) 

451 

452 msTool.close() 

453 return rtnVal 

454 

455 def override__args(self,arg,value): 

456 """ Override a parameter value in ParallelDataHelper arguments 

457  

458 Keyword arguments: 

459 arg -- name of the parameter 

460 value -- value of the parameter 

461  

462 It is usually used for the outputvis or createmms parameters. 

463 """ 

464 self.__args[arg] = value 

465 

466 def setupCluster(self, thistask=''): 

467 """ Get a cluster 

468  

469 Keyword argument: 

470 thistask -- the task calling this class 

471  

472 ParallelTaskHelper will populate its self._arg dictionary with 

473 a copy of the parameters of the calling task, which have been 

474 set in self.__args. 

475 """ 

476 

477 # It needs to use the updated list of parameters!!! 

478 ParallelTaskHelper.__init__(self, task_name=thistask, args=self.__args) 

479 

480 def setupParameters(self, **pars): 

481 """ Create a dictionary with non-empty parameters  

482  

483 Keyword argument: 

484 **pars -- a dictionary with key:value pairs 

485  

486 It will return a dictionary with only non-empty parameters. 

487 """ 

488 

489 seldict = {} 

490 for k,v in pars.items(): 

491 if v != None and v != "": 

492 seldict[k] = v 

493 

494 return seldict 

495 

496 def validateModelCol(self): 

497 """ Add the realmodelcol parameter to the configuration 

498 only for some values of datacolumn. Specific for mstransform. 

499 This method must use the local class self.__args parameters. 

500 """ 

501 

502 ret = False 

503 

504 dc = self.__args['datacolumn'].upper() 

505 if "MODEL" in dc or dc == 'ALL': 

506 ret = True 

507 

508 return ret 

509 

510 def initialize(self): 

511 """Initializes some parts of the cluster setup. 

512 Add the full path for the input and output MS. 

513 Creates the temporary directory to save each 

514 parallel subMS. The directory is called 

515 <outputvis>.data. 

516 This method overrides the one from ParallelTaskHelper. 

517 """ 

518 

519 casalog.origin("ParallelDataHelper") 

520 

521 # self._arg is populated inside ParallelTaskHelper._init_() 

522 self._arg['vis'] = os.path.abspath(self._arg['vis']) 

523 # MPI setting 

524 if self._mpi_cluster: 

525 self._cluster.start_services() 

526 

527 if (self._arg['outputvis'] != ""): 

528 self._arg['outputvis'] = os.path.abspath(self._arg['outputvis']) 

529 

530 outputPath, self.outputBase = os.path.split(self._arg['outputvis']) 

531 try: 

532 if self.outputBase[-1] == '.': 

533 self.outputBase = self.outputBase[:self.outputBase.rindex('.')] 

534 except ValueError: 

535 # outputBase must not have a trailing . 

536 pass 

537 

538 if self.outputBase == '.' or self.outputBase == './': 

539 raise ValueError('Error dealing with outputvis') 

540 

541 # The subMS are first saved inside a temporary directory 

542 self.dataDir = outputPath + '/' + self.outputBase+'.data' 

543 if os.path.exists(self.dataDir): 

544 shutil.rmtree(self.dataDir) 

545 

546 os.mkdir(self.dataDir) 

547 

548 def generateJobs(self): 

549 """ This is the method which generates all of the actual jobs to be done. 

550 This method overrides the one in ParallelTaskHelper baseclass. 

551 """ 

552 

553 casalog.origin("ParallelDataHelper") 

554 casalog.post("Analyzing MS for partitioning") 

555 if ParallelDataHelper.isParallelMS(self._arg['vis']): 

556 casalog.post("Input vis is a Multi-MS") 

557 

558 

559 # Input MMS, processed in parallel; output is an MMS 

560 # For tasks such as split2, hanningsmooth2 

561 if ParallelDataHelper.isParallelMS(self._arg['vis']) and (not 'monolithic_processing' in self._arg): 

562 self.__createNoSeparationCommand() 

563 

564 # For mstransform when processing input MMS in parallel 

565 elif ParallelDataHelper.isParallelMS(self._arg['vis']) and self._arg['monolithic_processing'] == False: 

566 self.__createNoSeparationCommand() 

567 

568 # For tasks that create an output MMS. In these cases 

569 # input can be an MMS processed monolithically or an input MS 

570 elif self._arg['createmms']: 

571 self.__createPrimarySplitCommand() 

572 

573 return True 

574 

575 def __createNoSeparationCommand(self): 

576 """ Add commands to be executed by the engines when input is an MMS.  

577 This method overrides the following parameter: 

578 self._arg['createmms'] 

579 """ 

580 

581 submslist = ParallelTaskHelper.getReferencedMSs(self._arg['vis']) 

582 if len(submslist) == 0: 

583 raise ValueError('There are no subMSs in input vis') 

584 

585 tbTool = table( ) 

586 

587 listOutputMS = [] 

588 

589 subMs_idx = 0 

590 for subMS in submslist: 

591 

592 # make sure the SORTED_TABLE keywords are disabled 

593 tbTool.open(subMS, nomodify=False) 

594 if 'SORTED_TABLE' in tbTool.keywordnames(): 

595 tobeDeleted = tbTool.getkeyword('SORTED_TABLE').split(' ')[1] 

596 tbTool.removekeyword('SORTED_TABLE') 

597 os.system('rm -rf '+tobeDeleted) 

598 

599 tbTool.close() 

600 

601 listOutputMS.append(self.dataDir+'/%s.%04d.ms' \ 

602 % (self.outputBase, subMs_idx)) 

603 subMs_idx += 1 

604 

605 # Override the original parameters 

606 self.override_arg('outputvis',listOutputMS) 

607 

608 self._consolidateOutput = False 

609 

610 # Add to the list of jobs to execute 

611 subMs_idx = 0 

612 for subMS in submslist: 

613 localArgs = copy.copy(self._arg) 

614 localArgs['vis'] = subMS 

615 for key in self._arguser: 

616 localArgs[key] = self._arguser[key][subMs_idx] 

617 

618 if 'createmms' in self._arg: 

619 self._arg['createmms'] = False 

620 localArgs['createmms'] = False 

621 

622 subMs_idx += 1 

623 if not self._mpi_cluster: 

624 self._executionList.append(JobData(self._taskName, localArgs)) 

625 else: 

626 self._executionList.append([self._taskName + '()',localArgs]) 

627 

628 def __createPrimarySplitCommand(self): 

629 """ This method overwrites the following parameter: 

630 self._arg['separationaxis'] when running the monolithic case 

631 """ 

632 

633 if self._arg['createmms']: 

634 

635 if self._arg['separationaxis'].lower() == 'scan': 

636 self.__createScanSeparationCommands() 

637 elif self._arg['separationaxis'].lower() == 'spw': 

638 self.__createSPWSeparationCommands() 

639 elif self._arg['separationaxis'].lower() == 'baseline': 

640 self.__createBaselineSeparationCommands() 

641 elif self._arg['separationaxis'].lower() == 'auto': 

642 self.__createBalancedSeparationCommands() 

643 else: 

644 # Use a default 

645 self.__createDefaultSeparationCommands() 

646 

647 def __createScanSeparationCommands(self): 

648 """ This method is to generate a list of commands to partition 

649 the data based on scan. 

650 """ 

651 

652 scanList = self.__selectionScanList 

653 if scanList is None: 

654 self.__selectMS() 

655 scanList = self.__getScanList() 

656 

657 # Make sure we have enough scans to create the needed number of 

658 # subMSs. If not change the total expected. 

659 numSubMS = self._arg['numsubms'] 

660 if isinstance(numSubMS,str) and numSubMS == 'auto': 

661 # Create the best load balance based on the number of nodes 

662 numSubMS = self.getNumberOfServers() 

663 if numSubMS == None: 

664 numSubMS = 8 

665 numSubMS = min(len(scanList),numSubMS) 

666 

667 partitionedScans = self.__partition(scanList, numSubMS) 

668 for output in range(numSubMS): 

669 mmsCmd = copy.copy(self._arg) 

670 mmsCmd['createmms'] = False 

671 mmsCmd['scan']= ParallelTaskHelper.\ 

672 listToCasaString(partitionedScans[output]) 

673 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \ 

674 % (self.outputBase, output) 

675 if not self._mpi_cluster: 

676 self._executionList.append(JobData(self._taskName, mmsCmd)) 

677 else: 

678 self._executionList.append([self._taskName + '()',mmsCmd]) 

679 

680 def __createSPWSeparationCommands(self): 

681 """ This method is to generate a list of commands to partition 

682 the data based on spw. 

683 """ 

684 

685 # Get a unique list of selected spws  

686 self.__selectMS() 

687 spwList = self.__getSPWUniqueList() 

688 numSubMS = self._arg['numsubms'] 

689 

690 if isinstance(numSubMS,str) and numSubMS == 'auto': 

691 # Create the best load balance based on the number of nodes 

692 numSubMS = self.getNumberOfServers() 

693 if numSubMS == None: 

694 numSubMS = 8 

695 numSubMS = min(len(spwList),numSubMS) 

696 

697 # Get a dictionary of the spws parted for each subMS, with IDs as strings 

698 spwList = list(map(str,spwList)) 

699 partitionedSPWs1 = self.__partition1(spwList,numSubMS) 

700 

701 # Add the channel selections back to the spw expressions 

702 newspwsel = self.__createSPWExpression(partitionedSPWs1) 

703 

704 # Validate the chanbin parameter 

705 validbin = False 

706 parname = self.getChanAvgParamName() 

707 if self.validateChanBin(): 

708 if isinstance(self._arg[parname], list): 

709 matched_chanbins = ParallelDataHelper.__get_matched_chanbins( 

710 self.__args['spw'], spwList, self._arg[parname]) 

711 freqbinlist = self.__partition1(matched_chanbins, numSubMS) 

712 validbin = True 

713 

714 # Calculate the ddistart for each engine. This will be used 

715 # to calculate the DD IDs of the output main table of the subMSs 

716 ddistartlist = self.__calculateDDIstart({}, partitionedSPWs1) 

717 if (len(ddistartlist) != len(partitionedSPWs1)): 

718 casalog.post('Error calculating the ddistart indices','SEVERE') 

719 raise 

720 

721 for output in range(numSubMS): 

722 mmsCmd = copy.copy(self._arg) 

723 mmsCmd['createmms'] = False 

724 if self.__selectionScanList is not None: 

725 mmsCmd['scan'] = ParallelTaskHelper.\ 

726 listToCasaString(self.__selectionScanList) 

727 mmsCmd['spw'] = newspwsel[output] 

728 if validbin: 

729 mmsCmd[parname] = freqbinlist[output] 

730 

731 self.__ddistart = ddistartlist[output] 

732 mmsCmd['ddistart'] = self.__ddistart 

733 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \ 

734 % (self.outputBase, output) 

735 

736 # Dictionary for the spw/ddi consolidation later 

737 self.__ddidict[self.__ddistart] = self.dataDir+'/%s.%04d.ms' \ 

738 % (self.outputBase, output) 

739 

740 if not self._mpi_cluster: 

741 self._executionList.append(JobData(self._taskName, mmsCmd)) 

742 else: 

743 self._executionList.append([self._taskName + '()',mmsCmd]) 

744 

745 @staticmethod 

746 def __get_matched_chanbins(task_input_spws, spw_list, task_chanbin): 

747 """ 

748 Produces a reordered list of per-SPW chanbin/width values, so that 

749 it matches the SPW list after groing through several 

750 potentially re-ordering done operations in this helper 

751 (for example __createSPWExpression()/__getSPWUniqueList(). 

752 The reordered chanbin list is safe to use in functions 

753 like __partition1(). 

754 

755 :param task_input_spws: list of SPWs as given in the task input 

756 :param spw_list: the list of SPWs as (potentially) reordered in this helper. 

757 For example the output from __getSPWUniqueList() 

758 :param task_chanbin: list of chanbin/width as given to the task input parameter 

759 

760 :returns: the task_chanbin list reordered to match how the 

761 original (task input) list of SPWs has been reordered in 

762 this helper into 'spw_list' 

763 """ 

764 param_spw_orig = task_input_spws.split(',') 

765 param_spw_spws = [item.partition(':')[0] for item in param_spw_orig] 

766 spw_to_idx = {} 

767 for spw in spw_list: 

768 spw_to_idx[spw] = spw_list.index(str(spw)) 

769 

770 spw_matched_chanbins = [None] * len(task_chanbin) 

771 for freq, spw in zip(task_chanbin, param_spw_spws): 

772 spw_idx = spw_to_idx[spw] 

773 spw_matched_chanbins[spw_idx] = freq 

774 

775 return spw_matched_chanbins 

776 

777# TO BE DEPRECATED 

778 def __createDefaultSeparationCommands(self): 

779 """ This method is to generate a list of commands to partition 

780 the data based on both scan/spw axes. 

781 """ 

782 import math 

783 

784 casalog.post('Partition per scan/spw will ignore NULL combinations of these two parameters.') 

785 

786 # Separates in scan and spw axes 

787 self.__selectMS() 

788 

789 # Get the list of spectral windows as strings 

790 spwList = self.__getSPWUniqueList() 

791 spwList = map(str,spwList) 

792 

793 # Check if we can just divide on SPW or if we need to do SPW and scan 

794 numSubMS = self._arg['numsubms'] 

795 if isinstance(numSubMS,str) and numSubMS == 'auto': 

796 # Create the best load balance based on the number of nodes 

797 numSubMS = self.getNumberOfServers() 

798 if numSubMS == None: 

799 numSubMS = 8 

800 numSpwPartitions = min(len(spwList),numSubMS) 

801 numScanPartitions = int(math.ceil(numSubMS/float(numSpwPartitions))) 

802 

803 if numScanPartitions > 1: 

804 # Check that the scanlist is not null 

805 scanList = self.__selectionScanList 

806 if scanList is None: 

807 scanList = self.__getScanList() 

808 

809 # Check that the number of scans is enough for the partitions 

810 if len(scanList) < numScanPartitions: 

811 numScanPartitions = len(scanList) 

812 else: 

813 scanList = None 

814 

815 partitionedSpws = self.__partition1(spwList,numSpwPartitions) 

816 partitionedScans = self.__partition(scanList,numScanPartitions) 

817 

818 # The same list but as a dictionary 

819 str_partitionedScans = self.__partition1(scanList,numScanPartitions) 

820 

821 # Validate the chanbin parameter 

822 validbin = False 

823 parname = self.getChanAvgParamName() 

824 if self.validateChanBin(): 

825 if isinstance(self._arg[parname],list): 

826 freqbinlist = self.__partition1(self._arg[parname],numSpwPartitions) 

827 validbin = True 

828 

829 # Add the channel selections back to the spw expressions 

830 newspwsel = self.__createSPWExpression(partitionedSpws) 

831 

832 # Calculate the ddistart for the subMSs (for each engine) 

833 ddistartlist = self.__calculateDDIstart(str_partitionedScans, partitionedSpws) 

834 

835 if (len(ddistartlist) != len(range(numSpwPartitions*numScanPartitions))): 

836 casalog.post('Error calculating ddistart for the engines', 'SEVERE') 

837 raise 

838 

839 # Set the first DD ID for the sub-table consolidation 

840 ddi0 = ddistartlist[0] 

841 self.__ddistart = 0 

842 

843 # index that composes the subms names (0000, 0001, etc.) 

844 sindex = 0 

845 for output in range(numSpwPartitions*numScanPartitions): 

846 

847 # Avoid the NULL MS selections by verifying that the 

848 # combination scan-spw exist. 

849 scansellist = map(str, partitionedScans[output%numScanPartitions]) 

850 selscan = '' 

851 for ss in scansellist: 

852 selscan = selscan + ',' + ss 

853 selscan = selscan.lstrip(',') 

854 if not self.__scanspwSelection(selscan, 

855 str(newspwsel[output/numScanPartitions])): 

856 continue 

857 

858 # The first valid subMS must have DDI 0 

859 if sindex == 0: 

860 self.__ddidict[0] = self.dataDir+'/%s.%04d.ms'%\ 

861 (self.outputBase, sindex) 

862 else: 

863 self.__ddistart = ddistartlist[output] 

864 

865 if self.__ddistart != ddi0: 

866 ddi0 = ddistartlist[output] 

867 # Dictionary for sub-table consolidation 

868 self.__ddidict[self.__ddistart] = self.dataDir+'/%s.%04d.ms'% \ 

869 (self.outputBase, sindex) 

870 

871 mmsCmd = copy.copy(self._arg) 

872 mmsCmd['createmms'] = False 

873 mmsCmd['scan'] = ParallelTaskHelper.listToCasaString \ 

874 (partitionedScans[output%numScanPartitions]) 

875 

876 mmsCmd['spw'] = newspwsel[output/numScanPartitions] 

877 if validbin: 

878 mmsCmd[parname] = freqbinlist[output/numScanPartitions] 

879 mmsCmd['ddistart'] = self.__ddistart 

880 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \ 

881 % (self.outputBase, sindex) 

882 

883 if not self._mpi_cluster: 

884 self._executionList.append(JobData(self._taskName, mmsCmd)) 

885 else: 

886 self._executionList.append([self._taskName + '()',mmsCmd]) 

887 

888 sindex += 1 # index of subMS name 

889 

890 def __createBalancedSeparationCommands(self): 

891 """ Generate a list of partition commands based on table language  

892 queries that distribute the scan/spw pairs among subMSs to 

893 balance the load along field, spw and scan axes 

894 """ 

895 

896 casalog.post('Automatically distribute the scan/spw pairs to balance the load along field, spw and scan axes') 

897 

898 # Get parameters for the partition map function  

899 msfilename = self._arg['vis'] 

900 numSubMS = self._arg['numsubms'] 

901 if isinstance(numSubMS,str) and numSubMS == 'auto': 

902 # Create the best load balance based on the number of nodes 

903 numSubMS = self.getNumberOfServers() 

904 if numSubMS == None: 

905 numSubMS = 8 

906 

907 selection = self.__getSelectionFilter() 

908 

909 # Get partition map 

910 partitionMap = ph.getPartitionMap(msfilename, numSubMS, selection, axis=['field','spw','scan'],plotMode=0) 

911 

912 # Iterate over list of subMSs 

913 for subms in partitionMap: 

914 

915 mmsCmd = copy.deepcopy(self._arg) 

916 mmsCmd['createmms'] = False 

917 mmsCmd['taql'] = partitionMap[subms]['taql'] 

918 mmsCmd['outputvis'] = self.dataDir + '/%s.%04d.ms' % (self.outputBase, subms) 

919 

920 if not self._mpi_cluster: 

921 self._executionList.append(JobData(self._taskName, mmsCmd)) 

922 else: 

923 self._executionList.append([self._taskName + '()',mmsCmd]) 

924 

925 def __createBaselineSeparationCommands(self): 

926 """ This method is to generate a list of commands to partition 

927 the data based on baseline. 

928 """ 

929 

930 # Get the available baselines in MS (it will take selections into account) 

931 baselineList = self.__getBaselineList() 

932 

933 # Make sure we have enough baselines to create the needed number of 

934 # subMSs. If not change the total expected. 

935 numSubMS = self._arg['numsubms'] 

936 if isinstance(numSubMS,str) and numSubMS == 'auto': 

937 # Create the best load balance based on the number of nodes 

938 numSubMS = self.getNumberOfServers() 

939 if numSubMS == None: 

940 numSubMS = 8 

941 

942 numSubMS = min(len(baselineList),numSubMS) 

943 

944 # Create a map of the baselines to distribute in each subMS 

945 # Example of baselinePartitions 

946 # {0: [[0, 0]], 1: [[0, 1], [0, 2]], 2: [[0, 3]], 3: [[1, 1]], 4: [[1, 2]]} 

947 baselinePartitions = self.__partition1(baselineList, numSubMS) 

948 

949 # Use the above list of baselines to construct a TaQL expression for each subMS 

950 submsBaselineMap = {} 

951 for subms in baselinePartitions.keys(): 

952 submsBaselineMap[subms] = {} 

953 mytaql = [] 

954 submsPair = baselinePartitions[subms] 

955 ant1ant2 = [] 

956 for idx in range(len(submsPair)): 

957 ant1ant2 = submsPair[idx] 

958 if type(ant1ant2) == list: 

959 ant1 = ant1ant2[0] 

960 ant2 = ant1ant2[1] 

961 mytaql.append(('(ANTENNA1==%i && (ANTENNA2 IN %i))') % (ant1, ant2)) 

962 

963 mytaql = ' OR '.join(mytaql) 

964 submsBaselineMap[subms]['taql'] = mytaql 

965 

966 # Create the commands for each SubMS (each engine) 

967 for output in range(numSubMS): 

968 mmsCmd = copy.copy(self._arg) 

969 mmsCmd['createmms'] = False 

970 mmsCmd['taql'] = submsBaselineMap[output]['taql'] 

971 

972 mmsCmd['outputvis'] = self.dataDir+'/%s.%04d.ms' \ 

973 % (self.outputBase, output) 

974 

975 if not self._mpi_cluster: 

976 self._executionList.append(JobData(self._taskName, mmsCmd)) 

977 else: 

978 self._executionList.append([self._taskName + '()',mmsCmd]) 

979 

980 

981 def __scanspwSelection(self, scan, spw): 

982 """ Return True if the selection is True or False otherwise. """ 

983 

984 isSelected = False 

985 mysel = {} 

986 mysel['scan'] = scan 

987 mysel['spw'] = spw 

988 

989 if self._msTool is None: 

990 # Open up the msTool 

991 self._msTool = ms( ) 

992 self._msTool.open(self._arg['vis']) 

993 else: 

994 self._msTool.reset() 

995 

996 try: 

997 isSelected = self._msTool.msselect(mysel) 

998 except: 

999 isSelected = False 

1000 casalog.post('Ignoring NULL combination of scan=%s and spw=%s'% 

1001 (scan, spw), 'DEBUG1') 

1002 

1003 return isSelected 

1004 

1005 def __calculateDDIstart(self, partedscans, partedspws): 

1006 """ Calculate the list of DDI values for each partition (each engine). 

1007  

1008 Keyword arguments: 

1009 partedscans --- dictionary of parted scans as returned from self.__partition1() 

1010 partedspws --- dictionary of parted spws as returned from self.__partition1() 

1011  

1012 It returns a list of ddistart values with the same size of the number of subMSs. 

1013 """ 

1014 

1015 # Example of partedspws: 

1016 # create 2 subMss with spw=0,1,2 and spw=3 

1017 # partedSPWs = {0:['0','1','2'],1:['3']} 

1018 # 

1019 # create 3 subMSs with spw=0,1,2 spw=3 and spw=4,5 

1020 # partedSPWs = {0:['0','1','2'],1:['3'],2:['4','5']} 

1021 

1022 hasscans = True 

1023 if len(partedscans) == 0: 

1024 scans = '' 

1025 hasscans = False 

1026 

1027 # It needs to take the correlation selection into account 

1028 corr_sel = self._arg['correlation'] 

1029 ddistartList = [] 

1030 

1031 # scan+spw separation axis  

1032 if hasscans: 

1033 count = 0 

1034 for k,spws in lociteritems(partedspws): 

1035 for ks,scans in lociteritems(partedscans): 

1036 if self._msTool is None: 

1037 self._msTool = ms( ) 

1038 self._msTool.open(self._arg['vis'],nomodify=False) 

1039 else: 

1040 self._msTool.reset() 

1041 

1042 try: 

1043 # The dictionary with selected indices 

1044 seldict = self._msTool.msseltoindex(vis=self._arg['vis'],scan=scans,spw=spws,polarization=corr_sel) 

1045 except: 

1046 self._msTool.close() 

1047 continue 

1048 

1049 # Get the selected DD IDs 

1050 ddis = seldict['dd'].tolist() 

1051 ddsize = ddis.__len__() 

1052 if count == 0: 

1053 ddistart = 0 

1054 

1055 # Create a ddistart list 

1056 ddistartList.append(ddistart) 

1057 ddistart = ddistart + ddsize 

1058 count = count + 1 

1059 

1060 # spw separation axis  

1061 else: 

1062 count = 0 

1063 for k,spws in lociteritems(partedspws): 

1064 if self._msTool is None: 

1065 self._msTool = ms( ) 

1066 self._msTool.open(self._arg['vis'],nomodify=False) 

1067 else: 

1068 self._msTool.reset() 

1069 

1070 try: 

1071 # The dictionary with selected indices 

1072 seldict = self._msTool.msseltoindex(vis=self._arg['vis'],scan=scans,spw=spws, polarization=corr_sel) 

1073 except: 

1074 self._msTool.reset() 

1075 continue 

1076 

1077 # Get the selected DD IDs 

1078 ddis = seldict['dd'].tolist() 

1079 ddsize = ddis.__len__() 

1080 if count == 0: 

1081 ddistart = 0 

1082 

1083 # Create a ddistart list 

1084 ddistartList.append(ddistart) 

1085 ddistart = ddistart + ddsize 

1086 count = count + 1 

1087 

1088 return ddistartList 

1089 

1090 def __selectMS(self): 

1091 """ This method will open the MS and ensure whatever selection criteria 

1092 have been requested are honored. If scanList is not None then it  

1093 is used as the scan selection criteria. 

1094 """ 

1095 

1096 if self._msTool is None: 

1097 self._msTool = ms( ) 

1098 self._msTool.open(self._arg['vis']) 

1099 else: 

1100 self._msTool.reset() 

1101 

1102 # It returns a dictionary if there was any selection otherwise None 

1103 self.__selectionFilter = self.__getSelectionFilter() 

1104 

1105 if self.__selectionFilter is not None: 

1106 self._msTool.msselect(self.__selectionFilter) 

1107 

1108 def __getScanList(self): 

1109 """ This method returns the scan list from the current ms. Be careful 

1110 about having selection already done when you call this. 

1111 """ 

1112 

1113 if self._msTool is None: 

1114 self.__selectMS() 

1115 

1116 scanSummary = self._msTool.getscansummary() 

1117 scanList = [int(scan) for scan in scanSummary] 

1118 

1119 if len(scanList) == 0: 

1120 raise ValueError("No Scans present in the created MS.") 

1121 

1122 scanList.sort() 

1123 return scanList 

1124 

1125 def __getSPWUniqueList(self): 

1126 """ 

1127 Returns a list of spectral windows from the current MS (after 

1128 selection). The list is sorted by SPW ID. The list holds 

1129 integer ID and the IDs are sorted as integers. Note other 

1130 functions will map the ints to strings. 

1131 Selection is applied via __selectMS(). 

1132 """ 

1133 

1134 if self._msTool is None: 

1135 self.__selectMS() 

1136 

1137 # Now get the list of SPWs in the selected MS 

1138 ddInfo = self._msTool.getspectralwindowinfo() 

1139 self.__spwList = [info['SpectralWindowId'] for info in ddInfo.values()] 

1140 

1141 # I wonder why the concern about uniqueness. mstool.getspectralwindowinfo() should 

1142 # not return duplicated SPW IDs... 

1143 # Return a unique sorted list: 

1144 sorted_spws = list(set(self.__spwList)) 

1145 

1146 # Effectively sort by spw ID (as ints) before distributing to sub-MSs 

1147 sorted_spws.sort() 

1148 

1149 return sorted_spws 

1150 

1151 def __getBaselineList(self): 

1152 """ This method returns the baseline list from the current MS. Be careful 

1153 about having selection already done when you call this. 

1154 """ 

1155 

1156 # cumulative baseline selections do not reflect on the msselectedindices() 

1157 if self._msTool is None: 

1158 self.__selectMS() 

1159 

1160 

1161 # If there are any previous antenna selections, use it 

1162 if self._arg['antenna'] != '': 

1163 baselineSelection = {'baseline':self._arg['antenna']} 

1164 try: 

1165 self._msTool.msselect(baselineSelection, onlyparse=False) 

1166 # IMPORTANT: msselectedindices() will always say there are auto-correlation 

1167 # baselines, even when there aren't. In the MMS case, the SubMS creation will 

1168 # issue a MSSelectionNullSelection and not be created.  

1169 baselinelist = self._msTool.msselectedindices()['baselines'] 

1170 except: 

1171 baselinelist = [] 

1172 else: 

1173 md = msmetadata( ) 

1174 md.open(self._arg['vis']) 

1175 baselines = md.baselines() 

1176 md.close() 

1177 import numpy as np 

1178 baselinelist = np.vstack(np.where(np.triu(baselines))).T 

1179 

1180 

1181 return baselinelist.tolist() 

1182 

1183 def __getSelectionFilter(self): 

1184 """ This method takes the list of specified selection criteria and 

1185 puts them into a dictionary. There is a bit of name mangling necessary. 

1186 The pairs are: (msselection syntax, mstransform task syntax). 

1187 """ 

1188 

1189 selectionPairs = [] 

1190 selectionPairs.append(('field','field')) 

1191 selectionPairs.append(('spw','spw')) 

1192 selectionPairs.append(('polarization','correlation')) 

1193 selectionPairs.append(('baseline','antenna')) 

1194 selectionPairs.append(('time','timerange')) 

1195 selectionPairs.append(('scan','scan')) 

1196 selectionPairs.append(('uvdist','uvrange')) 

1197 selectionPairs.append(('scanintent','intent')) 

1198 selectionPairs.append(('observation','observation')) 

1199 return self.__generateFilter(selectionPairs) 

1200 

1201 def __generateFilter(self, selectionPairs): 

1202 """It creates a dictionary of the non-empty selection parameters. 

1203  

1204 Keyword argument: 

1205 selectionPairs -- list with task parameter name = msselection parameter name 

1206 as returned from __getSelectionFilter() 

1207  

1208 It will look at the selection parameters in self_arg. 

1209 """ 

1210 filter = None 

1211 for (selSyntax, argSyntax) in selectionPairs: 

1212 if argSyntax in self._arg and self._arg[argSyntax] != '': 

1213 if filter is None: 

1214 filter = {} 

1215 filter[selSyntax] = self._arg[argSyntax] 

1216 

1217 return filter 

1218 

1219 def __partition(self, lst, n): 

1220 """ This method will split the list lst into "n" almost equal parts 

1221 if lst is none, then we assume an empty list. 

1222 """ 

1223 

1224 if lst is None: 

1225 lst = [] 

1226 

1227 division = len(lst)/float(n) 

1228 

1229 return [ lst[int(round(division * i)): 

1230 int(round(division * (i+1)))] for i in range(int(n))] 

1231 

1232 def __partition1(self, lst, n): 

1233 """ This method will split the list lst into "n" almost equal parts. 

1234 if lst is None, then we assume an empty list. 

1235  

1236 Keyword arguments: 

1237 lst --- spw list 

1238 n --- numsubms 

1239 

1240 It returns a dictionary such as: 

1241 given the selection spw='0,1:10~20,3,4,5' 

1242 rdict = {0: ['0','1'], 1:['3','4','5']} 

1243 """ 

1244 

1245 if lst is None: 

1246 lst = [] 

1247 

1248 # Create a dictionary for the parted spws: 

1249 rdict = {} 

1250 division = len(lst)/float(n) 

1251 for i in range(int(n)): 

1252 part = lst[int(round(division * i)):int(round(division * (i+1)))] 

1253 rdict[i] = part 

1254 

1255 return rdict 

1256 

1257 def __chanSelection(self, spwsel): 

1258 """ Create a dictionary of channel selections. 

1259  

1260 Keyword arguments: 

1261 spwsel --- a string with spw selection 

1262  

1263 It returns a dictionary such as: 

1264 spwsel = '0,1:10~20' 

1265 seldict = {0: {'channels': '', 'spw': '0'},  

1266 1: {'channels': '10~20', 'spw': '1'}} 

1267 """ 

1268 

1269 # Split to get each spw in a list 

1270 if spwsel.__contains__(','): 

1271 spwlist = spwsel.split(',') 

1272 else: 

1273 spwlist = spwsel.split(';') 

1274 

1275 spwid=[] 

1276 chanlist=[] 

1277 # Split to create two lists, one with channels, the other with spwIDs 

1278 for isel in spwlist: 

1279 # Get tail, colon and head 

1280 (s, c, ch) = isel.rpartition(":") 

1281 # Remove any blanks 

1282 s = s.strip(' ') 

1283 c = c.strip(' ') 

1284 ch = ch.strip(' ') 

1285 # If no tail, there was no colon to split. In this case, add the spwID 

1286 if s == "": 

1287 spwid.append(ch) 

1288 chanlist.append('') 

1289 else: 

1290 spwid.append(s) 

1291 chanlist.append(ch) 

1292 

1293 # Create a dictionary 

1294 seldict = {} 

1295 for ns in range(len(spwid)): 

1296 sel = {} 

1297 sel['spw'] = spwid[ns] 

1298 sel['channels'] = chanlist[ns] 

1299 seldict[ns] = sel 

1300 

1301 

1302 return seldict 

1303 

1304 def __createSPWExpression(self, partdict): 

1305 """ Creates the final spw expression that will be sent to the engines. 

1306 This adds back the channel selections to their spw counterparts. 

1307  

1308 Keyword arguments: 

1309 partdict --- dictionary from __partition1, such as: 

1310  

1311 Ex: partdict = {0: ['0','1'], 1:['3','4','5']} 

1312 when selection is spw = '0,1:10~20,3,4,5' 

1313 and effective number of subMSs is 2. 

1314 """ 

1315 

1316 # Create a dictionary of the spw/channel selections 

1317 # Ex: seldict = {0: {'channels': '', 'spw': '0'},  

1318 # 1: {'channels': '10~20', 'spw': '1'}} 

1319 seldict = self.__chanSelection(self.__spwSelection) 

1320 

1321 newdict = copy.copy(partdict) 

1322 

1323 # Match the spwId of partdict with those from seldict 

1324 # For the matches that contain channel selection in seldict, 

1325 # Add them to the spwID string in partdict 

1326 for keys,vals in seldict.items(): 

1327 for k,v in partdict.items(): 

1328 for i in range(len(v)): 

1329# if v[i] == seldict[keys]['spw'] and seldict[keys]['channels'] != '': 

1330# if v[i] == vals['spw'] and vals['channels'] != '': 

1331 # matches, now edit pardict 

1332 if v[i] == vals['spw']: 

1333 if vals['channels'] != '': 

1334 spwexpr = vals['spw'] + ':' + vals['channels'] 

1335 else: 

1336 # spwexpr = seldict[keys]['spw'] + ':' + seldict[keys]['channels'] 

1337 spwexpr = vals['spw'] 

1338 newdict[k][i] = spwexpr 

1339 

1340 # We now have a new dictionary of the form: 

1341 # newdict = {0: ['0', '1:10~20'], 1: ['3', '4','5']} 

1342 # We want it to be: 

1343 # newdict = {0: "0,1:10~20",1: "3, 4,5"} 

1344 

1345 # Add a comma separator for each expression making 

1346 # a single string for each key 

1347 for k,v in newdict.items(): 

1348 spwstr = "" 

1349 for s in range(len(v)): 

1350 spwstr = spwstr + v[s] + ',' 

1351 newdict[k] = spwstr.rstrip(',') 

1352 

1353 casalog.post('Dictionary of spw expressions is: ','DEBUG') 

1354 casalog.post ('%s'%newdict,'DEBUG') 

1355 

1356 return newdict 

1357 

1358 def getChanAvgParamName(self): 

1359 """ Get the channel average bin parameter name. 

1360 It will return a string with the parameter name, based on 

1361 the calling task. 

1362 """ 

1363 

1364 casalog.origin("ParallelDataHelper") 

1365 

1366 if self.__taskname == None: 

1367 return None 

1368 

1369 if self.__taskname == 'mstransform': 

1370 return 'chanbin' 

1371 elif self.__taskname == 'split' or self.__taskname == 'split2': 

1372 return 'width' 

1373 

1374 return None 

1375 

1376 def validateChanBin(self): 

1377 """ Check if channel average bin parameter has the same 

1378 size of the spw selection. 

1379  

1380 Returns True if parameter is valid or False otherwise. 

1381 This method must use the local class self.__args parameters. 

1382 TBD: make it a static method 

1383 """ 

1384 

1385 casalog.origin("ParallelDataHelper") 

1386 

1387 retval = True 

1388 

1389 # Get the parameter name, which depends on the task calling this class 

1390 parname = self.getChanAvgParamName() 

1391 casalog.post('Channel average parameter is called %s'%parname,'DEBUG1') 

1392 if parname == None: 

1393 retval = False 

1394 

1395 elif parname in self.__args: 

1396 fblist = self.__args[parname] 

1397 if isinstance(fblist,list): 

1398 

1399 if len(fblist) > 1: 

1400 if self.__spwList == None: 

1401 msTool = ms( ) 

1402 msTool.open(self.__args['vis']) 

1403 spwsel = self.__args['spw'] 

1404 msTool.msselect({'spw':spwsel}) 

1405 ddInfo = msTool.getspectralwindowinfo() 

1406 self.__spwList = [info['SpectralWindowId'] for info in ddInfo.values()] 

1407 msTool.close() 

1408 

1409 if len(self.__spwList) != len(fblist): 

1410 retval = False 

1411 raise ValueError('Number of %s is different from the number of spw' %parname) 

1412 

1413 

1414 return retval 

1415 

1416 def defaultRegridParams(self): 

1417 """ Reset the default values of the regridms transformation parameters based on the mode. 

1418 Specific for mstransform task. 

1419 This method must use the local class self.__args parameters. 

1420 TBD: make it a static method 

1421 """ 

1422 

1423 casalog.origin("ParallelDataHelper") 

1424 

1425 if self.__args['mode'] == 'channel' or self.__args['mode'] == 'channel_b': 

1426 self.__args['start'] = str(self.__args['start']) 

1427 self.__args['width'] = str(self.__args['width']) 

1428 

1429 elif self.__args['mode'] == 'velocity': 

1430 restfreq = self.__args['restfreq'] 

1431 if restfreq == "" or restfreq.isspace(): 

1432 raise ValueError("Parameter restfreq must be set when mode='velocity'") 

1433 

1434 if self.__args['start'] == 0: 

1435 self.__args['start'] = '' 

1436 

1437 if self.__args['width'] == 1: 

1438 self.__args['width'] = '' 

1439 

1440 

1441 # Check if the parameter has valid velocity units 

1442 if not self.__args['start'] == '': 

1443 if (_qa.quantity(self.__args['start'])['unit'].find('m/s') < 0): 

1444 raise TypeError('Parameter start does not have valid velocity units') 

1445 

1446 if not self.__args['width'] == '': 

1447 if (_qa.quantity(self.__args['width'])['unit'].find('m/s') < 0): 

1448 raise TypeError('Parameter width does not have valid velocity units') 

1449 

1450 elif self.__args['mode'] == 'frequency': 

1451 if self.__args['start'] == 0: 

1452 self.__args['start'] = '' 

1453 if self.__args['width'] == 1: 

1454 self.__args['width'] = '' 

1455 

1456 # Check if the parameter has valid frequency units 

1457 if not self.__args['start'] == '': 

1458 if (_qa.quantity(self.__args['start'])['unit'].find('Hz') < 0): 

1459 raise TypeError('Parameter start does not have valid frequency units') 

1460 

1461 if not self.__args['width'] == '': 

1462 if (_qa.quantity(self.__args['width'])['unit'].find('Hz') < 0): 

1463 raise TypeError('Parameter width does not have valid frequency units') 

1464 

1465 start = self.__args['start'] 

1466 width = self.__args['width'] 

1467 

1468 return start, width 

1469 

1470 def postExecution(self): 

1471 """ This method overrides the postExecution method of ParallelTaskHelper, 

1472 in which case we probably need to generate the output reference MS. 

1473 """ 

1474 

1475 casalog.origin("ParallelDataHelper") 

1476 if self._msTool: 

1477 self._msTool.close() 

1478 

1479 # We created a data directory and many SubMSs, 

1480 # now build the reference MS. The outputList is a 

1481 # dictionary of the form: 

1482 # {'path/outputvis.data/SUBMSS/outputvis.0000.ms':True, 

1483 # 'path/outuputvis.data/SUBMSS/outputvis.0001.ms':False} 

1484 outputList = {} 

1485 

1486# if (ParallelTaskHelper.getBypassParallelProcessing()==1): 

1487 if (self._cluster == None): 

1488 # This is the list of output SubMSs 

1489 outputList = self._sequential_return_list 

1490 self._sequential_return_list = {} 

1491 elif (self._cluster != None): 

1492 command_response_list = self._cluster.get_command_response(self._command_request_id_list,True,True) 

1493 # Format list in the form of vis dict 

1494 for command_response in command_response_list: 

1495 outvis = command_response['parameters']['outputvis'] 

1496 outputList[outvis] = command_response['successful'] 

1497 

1498 casalog.post('postExecution dict of commands and success values: {}'.format( 

1499 outputList), 'DEBUG') 

1500 

1501 # List of failed MSs. TBD 

1502 nFailures = [] 

1503 

1504 subMSList = [] 

1505 

1506 nFailures = [v for v in outputList.values() if v == False] 

1507 

1508 for subMS in outputList: 

1509 # Only use the successful (as per command response fields) output MSs 

1510 if outputList[subMS]: 

1511 subMSList.append(subMS) 

1512 

1513 subMSList.sort() 

1514 

1515 if len(subMSList) == 0: 

1516 casalog.post("Error: no subMSs were successfully created.", 'WARN') 

1517 return False 

1518 

1519 # When separationaxis='scan' there is no need to give ddistart.  

1520 # The tool looks at the whole spw selection and 

1521 # creates the indices from it. After the indices are worked out,  

1522 # it applies MS selection. We do not need to consolidate either. 

1523 

1524 # If axis is spw, give a list of the subMSs 

1525 # that need to be consolidated. This list is pre-organized 

1526 # inside the separation functions above. 

1527 

1528 # Only when input is MS or MS-like and createmms=True 

1529 # Only partition and mstransform have the createmms parameter 

1530 if 'createmms' in self._arg and self._arg['createmms'] == True and self._arg['separationaxis'] == 'spw': 

1531# if (self._arg['separationaxis'] == 'spw' or  

1532# self._arg['separationaxis'] == 'auto'):  

1533# if (self._arg['separationaxis'] == 'spw'):  

1534 

1535 casalog.post('Consolidate the sub-tables') 

1536 

1537 toUpdateList = list(self.__ddidict.values()) 

1538 

1539 toUpdateList.sort() 

1540 casalog.post('List to consolidate %s'%toUpdateList,'DEBUG') 

1541 

1542 # Consolidate the spw sub-tables to take channel selection 

1543 # or averages into account. 

1544 mtlocal1 = mstransformer() 

1545 try: 

1546 mtlocal1.mergespwtables(toUpdateList) 

1547 mtlocal1.done() 

1548 except Exception: 

1549 mtlocal1.done() 

1550 casalog.post('Cannot consolidate spw sub-tables in MMS','SEVERE') 

1551 return False 

1552 

1553 if len(nFailures) > 0: 

1554 casalog.post('%s subMSs failed to be created. This is not an error, if due to selection when creating a Multi-MS'%len(nFailures)) 

1555 # need to rename/re-index the subMSs 

1556 newList = copy.deepcopy(subMSList) 

1557 idx = 0 

1558 for subms in newList: 

1559 suffix = re.findall(r".\d{4}.ms",subms) 

1560# newms = subms.rpartition(suffix[-1])[0]  

1561 newms = subms[:-len(suffix[-1])] 

1562 newms = newms+'.%04d.ms'%idx 

1563 os.rename(subms,newms) 

1564 newList[idx] = newms 

1565 idx += 1 

1566 

1567 

1568 if len(subMSList) == len(newList): 

1569 subMSList = newList 

1570 

1571 # Get the first subMS to be the reference when 

1572 # copying the sub-tables to the other subMSs  

1573 mastersubms = subMSList[0] 

1574 

1575 # Get list of all subtables in a subms 

1576 thesubtables = ph.getSubtables(mastersubms) 

1577 

1578 # Remove the SOURCE and HISTORY tables, which will be the only copied. 

1579 # All other sub-tables will be linked to first subms 

1580 thesubtables.remove('SOURCE') 

1581 thesubtables.remove('HISTORY') 

1582 

1583 subtabs_to_omit = thesubtables 

1584 

1585 # Parallel axis to write to table.info of MMS 

1586 # By default take the one from the input MMS 

1587 parallel_axis = ph.axisType(self.__args['vis']) 

1588 if 'createmms' in self._arg and self._arg['createmms'] == True: 

1589 parallel_axis = self._arg['separationaxis'] 

1590 

1591 if parallel_axis == 'auto' or parallel_axis == 'both': 

1592 parallel_axis = 'scan,spw' 

1593 

1594 # Copy sub-tables from first subMS to the others. The tables in 

1595 # subtabs_to_omit are linked instead of copied. 

1596 casalog.post("Finalizing MMS structure") 

1597 ph.makeMMS(self._arg['outputvis'], subMSList, 

1598 True, # copy subtables (will copy only the SOURCE and HISTORY tables) 

1599 subtabs_to_omit, # omitting these 

1600 parallel_axis 

1601 ) 

1602 

1603 thesubmscontainingdir = os.path.dirname(subMSList[0].rstrip('/')) 

1604 

1605 shutil.rmtree(thesubmscontainingdir) 

1606 

1607 # Sanity check on the just created MMS 

1608 # check for broken symlinks 

1609 try: 

1610 with open(os.devnull, 'w') as null: 

1611 p = subprocess.Popen(['find', '-L', self._arg['outputvis'], '-type', 'l'], 

1612 universal_newlines=True, stdout=subprocess.PIPE, stderr=null) 

1613 o, e = p.communicate() 

1614 if o: 

1615 casalog.post('The new MMS contain broken symlinks. Please verify', 'SEVERE') 

1616 casalog.post(o, 'SEVERE') 

1617 return False 

1618 except: 

1619 pass 

1620 

1621 return True