Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_sdtimeaverage.py: 94%

51 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import inspect 

2from types import CodeType 

3 

4import numpy 

5 

6from casatasks import casalog 

7from casatools import quanta 

8 

9from . import sdutil 

10 

11qa = quanta() 

12 

13 

14@sdutil.sdtask_decorator 

15def sdtimeaverage( 

16 infile, 

17 datacolumn, 

18 field, 

19 spw, 

20 timerange, 

21 scan, 

22 antenna, 

23 timebin, 

24 timespan, 

25 outfile): 

26 # When 'all'(default) or '' is specified, make timebin to cover all the data. 

27 if timebin.upper() in ['ALL', '']: 

28 timebin = set_timebin_all() + 's' 

29 

30 # Switch Alternative Column if needed. 

31 active_datacolumn = use_alternative_column(infile, datacolumn) 

32 

33 # Antenna ID (add extra &&& if needed) This is Single Dish specific. 

34 if (len(antenna) != 0) and (not('&' in antenna)): 

35 antenna = antenna + '&&&' 

36 

37 # 'scan,state' Warning 

38 # (unexpected result warning) 

39 if ('scan' in timespan) and ('state' in timespan): 

40 # WARN msg, to explain NRO specific issue. 

41 msg = '\n'.join( 

42 ["Explicitly both 'scan' and 'state' were specified in 'timespan'.", 

43 " If 'state' distinguishes OBSERVE_TARGET#ON_SOURCE / OBSERVE_TARGET#OFF_SOURCE,", 

44 " these two states are mixed, and unexpectedly averaged results might be generated.", 

45 "(Suggestion) Please specify timespan = 'scan'", 

46 " to separate OBSERVE_TARGET#ON_SOURCE and OBSERVE_TARGET#OFF_SOURCE."]) 

47 casalog.post(msg, 'WARN') 

48 

49 # (Note) When timespan='' is specified. 

50 # timespan will be directly posted to mstransform. 

51 

52 # Convert to check timebin 

53 tbin = qa.convert(qa.quantity(timebin), 's')['value'] 

54 

55 # Time average, once Enable. 

56 do_timeaverage = True 

57 

58 # Check timebin 

59 if tbin < 0: # Error, raise Exception. 

60 raise ValueError( 

61 "Parameter timebin must be >= '0s' to do time averaging") 

62 elif tbin == 0: # No averaging, when tbin == 0 

63 msg = 'Parameter timebin equals zero. No averaging will be performed.' 

64 casalog.post(msg, 'WARN') 

65 do_timeaverage = False 

66 

67 # extra parameter for do_mst 

68 ext_config = {'do_timeaverage': do_timeaverage} 

69 

70 caller: CodeType = inspect.currentframe().f_code 

71 

72 # Select Data and make Average. 

73 sdutil.do_mst( 

74 infile=infile, 

75 datacolumn=active_datacolumn, 

76 field=field, 

77 spw=spw, 

78 timerange=timerange, 

79 scan=scan, 

80 antenna=antenna, 

81 timebin=timebin, 

82 timespan=timespan, 

83 outfile=outfile, 

84 intent='', 

85 caller=caller, 

86 ext_config=ext_config) 

87 

88 # History 

89 sdutil.add_history(caller, casalog, outfile) 

90 

91 

92def use_alternative_column(infile, datacolumn): 

93 """Alternatively use datacolumn if the specified column does not exist. 

94 

95 In case 'float_data' does not exist, sdtimeaverage attempt to use 'data' 

96 and vice versa. (For user's convenience) 

97 """ 

98 # obtain the existence of data-column on specified MS. 

99 ex_float_data, ex_data = check_column(infile) 

100 

101 # alter datacolumn if available 

102 if (datacolumn == 'float_data'): # Change 'float_data' to 'data' 

103 if (not ex_float_data) and (ex_data): 

104 datacolumn = 'data' 

105 msg = 'No FLOAT_DATA column. DATA column will be used alternatively.' 

106 casalog.post(msg, 'INFO') 

107 elif (datacolumn == 'data'): # Change 'data' to 'float_data' 

108 if (ex_float_data) and (not ex_data): 

109 datacolumn = 'float_data' 

110 msg = 'No DATA column. FLOAT_DATA column will be used alternatively.' 

111 casalog.post(msg, 'INFO') 

112 

113 return datacolumn 

114 

115 

116def check_column(msname): 

117 """Check the specified column if it exists.""" 

118 with sdutil.table_manager(msname) as tb: 

119 columnNames = tb.colnames() 

120 exist_float_data = 'FLOAT_DATA' in columnNames 

121 exist_data = 'DATA' in columnNames 

122 return exist_float_data, exist_data 

123 

124 

125def set_timebin_all(): 

126 """Synthesize timebin. 

127 

128 assign very large value to cover 'all'. 

129 """ 

130 timebin = numpy.finfo(float).max 

131 return str(timebin)