Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_sdgaincal.py: 76%
78 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
3from casatasks import casalog
5from . import sdutil
6from .sdutil import calibrater_manager
8DEFAULT_VALUE = {'interp': 'linear',
9 'spwmap': [-1]}
12def parse_interp_item(interp):
13 assert isinstance(interp, str)
14 if len(interp) == 0:
15 return DEFAULT_VALUE['interp']
16 else:
17 return interp
20def parse_interp(interp, index):
21 assert index >= 0
22 if isinstance(interp, str):
23 # interp is a string that is valid to all applytables
24 return parse_interp_item(interp)
25 elif hasattr(interp, '__iter__'):
26 # interp is a list of strings
27 if index >= len(interp):
28 # wrong index or empty list
29 return DEFAULT_VALUE['interp']
30 else:
31 # interp is a list of strings
32 return parse_interp_item(interp[index])
33 assert False
36def parse_spwmap_item(spwmap):
37 assert hasattr(spwmap, '__iter__')
38 if len(spwmap) == 0:
39 return DEFAULT_VALUE['spwmap']
40 else:
41 return spwmap
44def parse_spwmap(spwmap, index):
45 assert hasattr(spwmap, '__iter__')
46 assert index >= 0
47 if len(spwmap) == 0:
48 # empty list
49 return DEFAULT_VALUE['spwmap']
50 elif all(map(lambda x: hasattr(x, '__iter__'), spwmap)):
51 # spwmap is list-of-list
52 if index >= len(spwmap):
53 # maybe wrong index
54 return DEFAULT_VALUE['spwmap']
55 else:
56 return parse_spwmap_item(spwmap[index])
57 elif all(map(lambda x: isinstance(x, int), spwmap)):
58 # maybe single spwmap that is valid to all applytables
59 return spwmap
60 assert False
63@sdutil.sdtask_decorator
64def sdgaincal(infile=None, calmode=None, radius=None, smooth=None,
65 antenna=None, field=None, spw=None, scan=None, intent=None,
66 applytable=None, interp=None, spwmap=None, outfile='', overwrite=False):
68 # outfile must be specified
69 if (outfile == '') or not isinstance(outfile, str):
70 raise ValueError("outfile is empty.")
72 # overwrite check
73 if os.path.exists(outfile) and not overwrite:
74 raise RuntimeError(outfile + ' exists.')
76 if infile is None or not isinstance(infile, str) or not os.path.exists(infile):
77 raise RuntimeError('infile not found - please verify the name')
79 # Calibrater tool
80 with calibrater_manager(infile) as mycb:
82 # select data
83 if isinstance(antenna, str) and len(antenna) > 0:
84 baseline = '{ant}&&&'.format(ant=antenna)
85 else:
86 baseline = ''
87 mycb.selectvis(spw=spw, scan=scan, field=field, intent=intent, baseline=baseline)
89 # set apply
90 casalog.post('interp="{0}" spwmap={1}'.format(interp, spwmap))
91 if isinstance(applytable, str):
92 if len(applytable) > 0:
93 thisinterp = parse_interp(interp, 0)
94 thisspwmap = parse_spwmap(spwmap, 0)
95 casalog.post('thisinterp="{0}" thisspwmap={1}'.format(thisinterp, thisspwmap))
96 mycb.setapply(table=applytable, interp=thisinterp, spwmap=thisspwmap)
97 elif hasattr(applytable, '__iter__'):
98 # list type
99 for i in range(len(applytable)):
100 table = applytable[i]
101 if isinstance(table, str) and len(table) > 0:
102 thisinterp = parse_interp(interp, i)
103 thisspwmap = parse_spwmap(spwmap, i)
104 casalog.post('thisinterp="{0}" thisspwmap={1}'.format(thisinterp, thisspwmap))
105 mycb.setapply(table=table, interp=thisinterp, spwmap=thisspwmap)
106 else:
107 raise RuntimeError(
108 f'wrong type of applytable item ({type(table)}). it should be string')
109 else:
110 raise RuntimeError(
111 f'wrong type of applytable ({type(applytable)}). it should be string or list')
113 # set solve
114 if calmode == 'doublecircle':
115 if radius is None:
116 raise RuntimeError('radius must be specified.')
117 elif not isinstance(radius, str):
118 rcenter = '%sarcsec' % (radius)
119 else:
120 try:
121 # if radius is a string only consists of numeric value without unit,
122 # it will succeed.
123 rcenter = '%sarcsec' % (float(radius))
124 except Exception:
125 # if the above fails, it may indicate that the string contains unit
126 rcenter = radius
127 mycb.setsolve(type='SDGAIN_OTFD', table=outfile, radius=rcenter, smooth=smooth)
128 else:
129 raise RuntimeError('Unknown calibration mode: \'{mode}\''.format(mode=calmode))
131 # solve
132 mycb.solve()