Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_sdgaincal.py: 76%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2 

3from casatasks import casalog 

4 

5from . import sdutil 

6from .sdutil import calibrater_manager 

7 

8DEFAULT_VALUE = {'interp': 'linear', 

9 'spwmap': [-1]} 

10 

11 

12def parse_interp_item(interp): 

13 assert isinstance(interp, str) 

14 if len(interp) == 0: 

15 return DEFAULT_VALUE['interp'] 

16 else: 

17 return interp 

18 

19 

20def parse_interp(interp, index): 

21 assert index >= 0 

22 if isinstance(interp, str): 

23 # interp is a string that is valid to all applytables 

24 return parse_interp_item(interp) 

25 elif hasattr(interp, '__iter__'): 

26 # interp is a list of strings 

27 if index >= len(interp): 

28 # wrong index or empty list 

29 return DEFAULT_VALUE['interp'] 

30 else: 

31 # interp is a list of strings 

32 return parse_interp_item(interp[index]) 

33 assert False 

34 

35 

36def parse_spwmap_item(spwmap): 

37 assert hasattr(spwmap, '__iter__') 

38 if len(spwmap) == 0: 

39 return DEFAULT_VALUE['spwmap'] 

40 else: 

41 return spwmap 

42 

43 

44def parse_spwmap(spwmap, index): 

45 assert hasattr(spwmap, '__iter__') 

46 assert index >= 0 

47 if len(spwmap) == 0: 

48 # empty list 

49 return DEFAULT_VALUE['spwmap'] 

50 elif all(map(lambda x: hasattr(x, '__iter__'), spwmap)): 

51 # spwmap is list-of-list 

52 if index >= len(spwmap): 

53 # maybe wrong index 

54 return DEFAULT_VALUE['spwmap'] 

55 else: 

56 return parse_spwmap_item(spwmap[index]) 

57 elif all(map(lambda x: isinstance(x, int), spwmap)): 

58 # maybe single spwmap that is valid to all applytables 

59 return spwmap 

60 assert False 

61 

62 

63@sdutil.sdtask_decorator 

64def sdgaincal(infile=None, calmode=None, radius=None, smooth=None, 

65 antenna=None, field=None, spw=None, scan=None, intent=None, 

66 applytable=None, interp=None, spwmap=None, outfile='', overwrite=False): 

67 

68 # outfile must be specified 

69 if (outfile == '') or not isinstance(outfile, str): 

70 raise ValueError("outfile is empty.") 

71 

72 # overwrite check 

73 if os.path.exists(outfile) and not overwrite: 

74 raise RuntimeError(outfile + ' exists.') 

75 

76 if infile is None or not isinstance(infile, str) or not os.path.exists(infile): 

77 raise RuntimeError('infile not found - please verify the name') 

78 

79 # Calibrater tool 

80 with calibrater_manager(infile) as mycb: 

81 

82 # select data 

83 if isinstance(antenna, str) and len(antenna) > 0: 

84 baseline = '{ant}&&&'.format(ant=antenna) 

85 else: 

86 baseline = '' 

87 mycb.selectvis(spw=spw, scan=scan, field=field, intent=intent, baseline=baseline) 

88 

89 # set apply 

90 casalog.post('interp="{0}" spwmap={1}'.format(interp, spwmap)) 

91 if isinstance(applytable, str): 

92 if len(applytable) > 0: 

93 thisinterp = parse_interp(interp, 0) 

94 thisspwmap = parse_spwmap(spwmap, 0) 

95 casalog.post('thisinterp="{0}" thisspwmap={1}'.format(thisinterp, thisspwmap)) 

96 mycb.setapply(table=applytable, interp=thisinterp, spwmap=thisspwmap) 

97 elif hasattr(applytable, '__iter__'): 

98 # list type 

99 for i in range(len(applytable)): 

100 table = applytable[i] 

101 if isinstance(table, str) and len(table) > 0: 

102 thisinterp = parse_interp(interp, i) 

103 thisspwmap = parse_spwmap(spwmap, i) 

104 casalog.post('thisinterp="{0}" thisspwmap={1}'.format(thisinterp, thisspwmap)) 

105 mycb.setapply(table=table, interp=thisinterp, spwmap=thisspwmap) 

106 else: 

107 raise RuntimeError( 

108 f'wrong type of applytable item ({type(table)}). it should be string') 

109 else: 

110 raise RuntimeError( 

111 f'wrong type of applytable ({type(applytable)}). it should be string or list') 

112 

113 # set solve 

114 if calmode == 'doublecircle': 

115 if radius is None: 

116 raise RuntimeError('radius must be specified.') 

117 elif not isinstance(radius, str): 

118 rcenter = '%sarcsec' % (radius) 

119 else: 

120 try: 

121 # if radius is a string only consists of numeric value without unit, 

122 # it will succeed. 

123 rcenter = '%sarcsec' % (float(radius)) 

124 except Exception: 

125 # if the above fails, it may indicate that the string contains unit 

126 rcenter = radius 

127 mycb.setsolve(type='SDGAIN_OTFD', table=outfile, radius=rcenter, smooth=smooth) 

128 else: 

129 raise RuntimeError('Unknown calibration mode: \'{mode}\''.format(mode=calmode)) 

130 

131 # solve 

132 mycb.solve()