Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/task_hanningsmooth.py: 18%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1import os 

2import shutil 

3import string 

4import copy 

5import math 

6import numpy 

7from typing import Optional, List, Union 

8 

9from .mstools import write_history 

10from casatools import table, ms, mstransformer 

11from casatasks import casalog 

12from .parallel.parallel_data_helper import ParallelDataHelper 

13 

14_tb = table() 

15 

16def hanningsmooth(vis=None, 

17 outputvis=None, 

18 keepmms=None, 

19 field=None, 

20 spw=None, 

21 scan=None, 

22 antenna=None, 

23 correlation=None, 

24 timerange=None, 

25 intent=None, 

26 array=None, 

27 uvrange=None, 

28 observation=None, 

29 feed=None, 

30 smooth_spw : Optional[Union[str,List[int], int]] = None, 

31 datacolumn=None, 

32 ): 

33 

34 """Hanning smooth frequency channel data to remove Gibbs ringing 

35 

36 """ 

37 

38 casalog.origin('hanningsmooth') 

39 

40 

41 # Initiate the helper class  

42 pdh = ParallelDataHelper("hanningsmooth", locals()) 

43 

44 # Validate input and output parameters 

45 pdh.setupIO() 

46 

47 # Input vis is an MMS 

48 if pdh.isMMSAndNotServer(vis) and keepmms: 

49 

50 if not pdh.validateInputParams(): 

51 raise Exception('Unable to continue with MMS processing') 

52 

53 pdh.setupCluster('hanningsmooth') 

54 

55 # Execute the jobs 

56 pdh.go() 

57 return 

58 

59 mslocal = ms() 

60 

61 # Actual task code starts here 

62 try: 

63 mtlocal = mstransformer() 

64 

65 # Gather all the parameters in a dictionary.  

66 config = {} 

67 

68 config = pdh.setupParameters(inputms=vis, outputms=outputvis, field=field, 

69 spw=spw, array=array, scan=scan, antenna=antenna, correlation=correlation, 

70 uvrange=uvrange,timerange=timerange, intent=intent, observation=observation, 

71 feed=feed) 

72 

73 

74 # Check if CORRECTED column exists, when requested 

75 datacolumn = datacolumn.upper() 

76 if datacolumn == 'CORRECTED': 

77 _tb.open(vis) 

78 if 'CORRECTED_DATA' not in _tb.colnames(): 

79 casalog.post('Input CORRECTED_DATA does not exist. Will use DATA','WARN') 

80 datacolumn = 'DATA' 

81 _tb.close() 

82 

83 casalog.post('Will use datacolumn = %s'%datacolumn, 'DEBUG') 

84 config['datacolumn'] = datacolumn 

85 

86 # Call MSTransform framework with hanning=True 

87 config['hanning'] = True 

88 # Parse smooth_spw 

89 smooth_spw_config = [] 

90 if isinstance(smooth_spw, str) : 

91 if(smooth_spw != '') : 

92 spw_ranges = smooth_spw.split(',') 

93 for range in spw_ranges: 

94 if range.strip().isdigit() : 

95 smooth_spw_config.append(int(range)) 

96 else : 

97 range_str = range.split('~') 

98 if (len(range_str) != 2): 

99 raise ValueError("smooth_spw must be a list of single SPWs, a str\ 

100ing with comma separated SPWs or SPWs ranges with ~. Cannot parse string as a SPW range: ",range_str) 

101 spw_init=range_str[0] 

102 spw_end=range_str[1] 

103 if(not spw_init.isdigit() or not spw_end.isdigit() ) : 

104 raise ValueError("smooth_spw must be a list of single SPWs, a str\ 

105ing with comma separated SPWs or SPWs ranges with ~. Cannot parse range start or end: ",range_str) 

106 for i in numpy.arange(int(spw_init), int(spw_end)+1) : 

107 smooth_spw_config.append(i) 

108 else : 

109 smooth_spw_config.append(smooth_spw) 

110 

111 if(len(smooth_spw_config) == 0 ) : 

112 smooth_spw_config = None 

113 config['smooth_spw'] = smooth_spw_config 

114 config['reindex'] = False 

115 

116 # Configure the tool  

117 casalog.post('%s'%config, 'DEBUG1') 

118 mtlocal.config(config) 

119 

120 # Open the MS, select the data and configure the output 

121 mtlocal.open() 

122 

123 # Run the tool 

124 casalog.post('Apply Hanning smoothing on data') 

125 mtlocal.run() 

126 

127 finally: 

128 mtlocal.done() 

129 

130 # Write history to output MS, not the input ms. 

131 try: 

132 param_names = hanningsmooth.__code__.co_varnames[:hanningsmooth.__code__.co_argcount] 

133 local_vars = locals() 

134 param_vals = [local_vars[p] for p in param_names] 

135 

136 casalog.post('Updating the history in the output', 'DEBUG1') 

137 write_history(mslocal, outputvis, 'hanningsmooth', param_names, 

138 param_vals, casalog) 

139 except Exception as instance: 

140 casalog.post("*** Error \'%s\' updating HISTORY" % (instance),'WARN') 

141 

142 mslocal = None 

143 

144