Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_nrobeamaverage.py: 89%
65 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import datetime
2import inspect
3import os
4import shutil
5from types import CodeType
7from casatasks import casalog
8from casatools import quanta
10from . import sdutil
12"""
13The following code is based on the mstransform code, with
14task name and some task parameters modified.
15To minimise code modification, the parameters used by
16mstransform but not by nrobeamaverage are kept as much as
17possible, and the default values for mstransform are given
18to them.
19(CAS-12475, 2019/6/7 WK)
20"""
23@sdutil.sdtask_decorator
24def nrobeamaverage(
25 infile,
26 datacolumn,
27 field,
28 spw,
29 timerange,
30 scan,
31 beam,
32 timebin,
33 outfile):
35 try:
36 # set temporary data name
37 tmpfile = 'tmp-nrobeamaverage-' + os.path.basename(infile.rstrip('/')) + '-' + \
38 "{0:%Y%m%d%H%M%S.%f}".format(datetime.datetime.now()) + '.ms'
40 caller: CodeType = inspect.currentframe().f_code
42 # data selection
43 sdutil.do_mst(
44 infile=infile,
45 datacolumn=datacolumn,
46 field=field,
47 spw=spw,
48 timerange=timerange,
49 scan=scan,
50 antenna='',
51 timebin='0s',
52 timespan='scan',
53 outfile=tmpfile,
54 intent='',
55 caller=caller,
56 ext_config={})
58 # open tmpfile and rewrite antenna column of the ON spectra using beam
59 idx_on = None
60 with sdutil.table_manager(os.path.join(tmpfile, 'STATE')) as tb:
61 ocol = tb.getcol('OBS_MODE')
62 for i in range(len(ocol)):
63 if ocol[i] == 'OBSERVE_TARGET#ON_SOURCE':
64 idx_on = i
65 break
66 if idx_on is None:
67 raise Exception('ON_SOURCE data not found.')
69 with sdutil.table_manager(os.path.join(tmpfile, 'ANTENNA')) as tb:
70 num_beams = len(tb.getcol('NAME'))
71 _beam, min_beamid = get_beamid(beam, num_beams)
73 with sdutil.table_manager(tmpfile, nomodify=False) as tb:
74 acol = tb.getcol('ANTENNA1')
75 scol = tb.getcol('STATE_ID')
76 for i in range(len(acol)):
77 if (acol[i] in _beam) and (scol[i] == idx_on):
78 acol[i] = min_beamid
79 tb.putcol('ANTENNA1', acol)
80 tb.putcol('ANTENNA2', acol)
82 qa = quanta()
83 tbin = qa.convert(qa.quantity(timebin), 's')['value']
84 if tbin < 0:
85 raise Exception("Parameter timebin must be > '0s' to do time averaging")
86 do_timeaverage = (tbin > 0)
88 ext_config = {'do_timeaverage': do_timeaverage}
90 # time averaging
91 sdutil.do_mst(
92 infile=tmpfile,
93 datacolumn=datacolumn,
94 field='',
95 spw='',
96 timerange='',
97 scan='',
98 antenna='',
99 timebin=timebin,
100 timespan="scan",
101 outfile=outfile,
102 intent='',
103 caller=caller,
104 ext_config=ext_config)
106 # History
107 sdutil.add_history(caller, casalog, outfile)
109 finally:
110 # delete tmpfile
111 if os.path.isdir(tmpfile):
112 shutil.rmtree(tmpfile)
115def get_beamid(beam, num_beams):
116 _beam = beam
117 try:
118 if isinstance(_beam, str):
119 _beam = _beam.strip().split(",")
120 elif not isinstance(_beam, list):
121 raise ValueError('the parameter beam must be list or string.')
123 # the default case (beam='')
124 if (len(_beam) == 1) and (_beam[0] == ''):
125 _beam = []
126 for i in range(num_beams):
127 _beam.append(i)
128 else:
129 for i in range(len(_beam)):
130 _beam[i] = int(_beam[i])
131 except Exception as e:
132 casalog.post("Error \'%s\' input beam ID is invalid" % (e))
134 min_beamid = _beam[0]
135 for i in range(len(_beam)):
136 if _beam[i] < min_beamid:
137 min_beamid = _beam[i]
139 return _beam, min_beamid