Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_nrobeamaverage.py: 89%

65 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import datetime 

2import inspect 

3import os 

4import shutil 

5from types import CodeType 

6 

7from casatasks import casalog 

8from casatools import quanta 

9 

10from . import sdutil 

11 

12""" 

13The following code is based on the mstransform code, with 

14task name and some task parameters modified. 

15To minimise code modification, the parameters used by 

16mstransform but not by nrobeamaverage are kept as much as 

17possible, and the default values for mstransform are given 

18to them. 

19(CAS-12475, 2019/6/7 WK) 

20""" 

21 

22 

23@sdutil.sdtask_decorator 

24def nrobeamaverage( 

25 infile, 

26 datacolumn, 

27 field, 

28 spw, 

29 timerange, 

30 scan, 

31 beam, 

32 timebin, 

33 outfile): 

34 

35 try: 

36 # set temporary data name 

37 tmpfile = 'tmp-nrobeamaverage-' + os.path.basename(infile.rstrip('/')) + '-' + \ 

38 "{0:%Y%m%d%H%M%S.%f}".format(datetime.datetime.now()) + '.ms' 

39 

40 caller: CodeType = inspect.currentframe().f_code 

41 

42 # data selection 

43 sdutil.do_mst( 

44 infile=infile, 

45 datacolumn=datacolumn, 

46 field=field, 

47 spw=spw, 

48 timerange=timerange, 

49 scan=scan, 

50 antenna='', 

51 timebin='0s', 

52 timespan='scan', 

53 outfile=tmpfile, 

54 intent='', 

55 caller=caller, 

56 ext_config={}) 

57 

58 # open tmpfile and rewrite antenna column of the ON spectra using beam 

59 idx_on = None 

60 with sdutil.table_manager(os.path.join(tmpfile, 'STATE')) as tb: 

61 ocol = tb.getcol('OBS_MODE') 

62 for i in range(len(ocol)): 

63 if ocol[i] == 'OBSERVE_TARGET#ON_SOURCE': 

64 idx_on = i 

65 break 

66 if idx_on is None: 

67 raise Exception('ON_SOURCE data not found.') 

68 

69 with sdutil.table_manager(os.path.join(tmpfile, 'ANTENNA')) as tb: 

70 num_beams = len(tb.getcol('NAME')) 

71 _beam, min_beamid = get_beamid(beam, num_beams) 

72 

73 with sdutil.table_manager(tmpfile, nomodify=False) as tb: 

74 acol = tb.getcol('ANTENNA1') 

75 scol = tb.getcol('STATE_ID') 

76 for i in range(len(acol)): 

77 if (acol[i] in _beam) and (scol[i] == idx_on): 

78 acol[i] = min_beamid 

79 tb.putcol('ANTENNA1', acol) 

80 tb.putcol('ANTENNA2', acol) 

81 

82 qa = quanta() 

83 tbin = qa.convert(qa.quantity(timebin), 's')['value'] 

84 if tbin < 0: 

85 raise Exception("Parameter timebin must be > '0s' to do time averaging") 

86 do_timeaverage = (tbin > 0) 

87 

88 ext_config = {'do_timeaverage': do_timeaverage} 

89 

90 # time averaging 

91 sdutil.do_mst( 

92 infile=tmpfile, 

93 datacolumn=datacolumn, 

94 field='', 

95 spw='', 

96 timerange='', 

97 scan='', 

98 antenna='', 

99 timebin=timebin, 

100 timespan="scan", 

101 outfile=outfile, 

102 intent='', 

103 caller=caller, 

104 ext_config=ext_config) 

105 

106 # History 

107 sdutil.add_history(caller, casalog, outfile) 

108 

109 finally: 

110 # delete tmpfile 

111 if os.path.isdir(tmpfile): 

112 shutil.rmtree(tmpfile) 

113 

114 

115def get_beamid(beam, num_beams): 

116 _beam = beam 

117 try: 

118 if isinstance(_beam, str): 

119 _beam = _beam.strip().split(",") 

120 elif not isinstance(_beam, list): 

121 raise ValueError('the parameter beam must be list or string.') 

122 

123 # the default case (beam='') 

124 if (len(_beam) == 1) and (_beam[0] == ''): 

125 _beam = [] 

126 for i in range(num_beams): 

127 _beam.append(i) 

128 else: 

129 for i in range(len(_beam)): 

130 _beam[i] = int(_beam[i]) 

131 except Exception as e: 

132 casalog.post("Error \'%s\' input beam ID is invalid" % (e)) 

133 

134 min_beamid = _beam[0] 

135 for i in range(len(_beam)): 

136 if _beam[i] < min_beamid: 

137 min_beamid = _beam[i] 

138 

139 return _beam, min_beamid