Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_importnro.py: 85%
47 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import datetime
2import os
3import sys
4import shutil
6from casatasks import casalog
7from casatools import calibrater, ms, singledishms
9from . import sdutil
10from .mstools import write_history
12mysdms = singledishms()
13mycb = calibrater()
14myms = ms()
17@sdutil.sdtask_decorator
18def importnro(infile=None, outputvis=None, overwrite=None, parallel=None):
19 """
20 """
21 status = True
23 try:
24 outputvis_temp = outputvis + '-backup-' + datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
26 if os.path.exists(outputvis):
27 if overwrite:
28 os.rename(outputvis, outputvis_temp)
29 else:
30 raise RuntimeError('%s exists.' % (outputvis))
32 if not _is_nostar(infile):
33 raise RuntimeError('%s is not a valid NOSTAR data.' % (infile))
35 status = mysdms.importnro(infile, outputvis, parallel)
37 if status:
38 # initialize weights using cb tool
39 mycb.open(outputvis, compress=False, addcorr=False, addmodel=False)
40 mycb.initweights(wtmode='nyq')
41 if os.path.exists(outputvis_temp):
42 shutil.rmtree(outputvis_temp)
43 else:
44 if os.path.exists(outputvis):
45 shutil.rmtree(outputvis)
46 if os.path.exists(outputvis_temp):
47 os.rename(outputvis_temp, outputvis)
48 raise RuntimeError('import failed.')
50 # Write parameters to HISTORY table of MS
51 param_names = importnro.__code__.co_varnames[:importnro.__code__.co_argcount]
52 vars = locals()
53 param_vals = [vars[p] for p in param_names]
54 write_history(myms, outputvis, 'importnro', param_names,
55 param_vals, casalog)
57 finally:
58 if status:
59 mycb.close()
62def _is_nostar(filename):
63 """Check if given data is NOSTAR or not."""
64 ret = False
65 if os.path.getsize(filename) >= 15136: # size of observation header
66 with open(filename, 'rb') as f:
67 if f.read(8).replace(b'\x00', b'') == b'RW':
68 ret = (f.read(15136 - 8 + 4)[-4:].replace(b'\x00', b'').decode( ) == 'LS')
69 f.close()
70 return ret