Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_importasap.py: 92%
49 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
2import re
4from casatasks import casalog
5from casatools import agentflagger, calibrater, ms, singledishms
7from . import sdutil
8from .mstools import write_history
10mysdms = singledishms()
11mycb = calibrater()
12myms = ms()
15@sdutil.sdtask_decorator
16def importasap(infile=None, outputvis=None, flagbackup=None, overwrite=None, parallel=None):
17 """
18 """
20 try:
21 if infile is None or outputvis is None:
22 raise RuntimeError('Error: infile and outputvis must be specified.')
24 # default value
25 if flagbackup is None:
26 flagbackup = True
28 if overwrite is None:
29 overwrite = False
31 # basic check
32 if os.path.exists(outputvis) and not overwrite:
33 raise RuntimeError('%s exists.' % (outputvis))
35 if not _is_scantable(infile):
36 raise RuntimeError('%s is not a valid Scantable.' % (infile))
38 # import
39 status = mysdms.importasap(infile, outputvis, parallel)
41 if status:
42 # flagversions file must be deleted
43 flagversions = outputvis.rstrip('/') + '.flagversions'
44 if os.path.exists(flagversions):
45 os.system('rm -rf %s' % (flagversions))
47 # initialize weights using cb tool
48 mycb.open(outputvis, compress=False, addcorr=False, addmodel=False)
49 mycb.initweights(wtmode='nyq')
51 # create flagbackup file if user requests it
52 if flagbackup:
53 aflocal = agentflagger()
54 aflocal.open(outputvis)
55 aflocal.saveflagversion(
56 'Original',
57 comment='Original flags at import into CASA using importasap',
58 merge='save')
59 aflocal.done()
60 else:
61 raise RuntimeError('Failure in importasap')
63 # Write history to output MS
64 param_names = importasap.__code__.co_varnames[:importasap.__code__.co_argcount]
65 vars = locals()
66 param_vals = [vars[p] for p in param_names]
67 write_history(myms, outputvis, 'importasap', param_names,
68 param_vals, casalog)
70 finally:
71 mycb.close()
74def _is_scantable(filename):
75 """Check if given data is Scantable or not."""
76 ret = False
77 if os.path.isdir(filename) and os.path.exists(filename + '/table.info') \
78 and os.path.exists(filename + '/table.dat'):
79 with open(filename + '/table.info') as f:
80 lines = f.readline()
81 f.close()
82 match_pattern = '^Type = (Scantable)? *$'
83 ret = re.match(match_pattern, lines) is not None
84 return ret