Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/casaxmlutil.py: 26%
195 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
1import os
2import functools
3import inspect
4from xml.dom import minidom
6import casatasks
8# constants for generating converter method
9__FUNCTION = 'override_args'
10__ARGS = '_a'
11__ARGS_DICT = '_d'
12__ARGS_SUPPLIED = '_s'
13__LOGLEVEL_IN_FUNCTION = 'INFO'
15__DEBUG = False
16if __DEBUG:
17 from pprint import pprint
20def xml_constraints_injector(func):
21 """Decorator which loads constraints from a casatask XML file and apply them to the arguments of the decorated casatask.
23 This method is designed as decorator for task methods. It executes as below:
24 1. converts a constraints element of a CASA task XML to a Python code.
25 2. evaluates the code, then a Python function is generated.
26 3. executes the function and overrides task arguments to values defined by constraints tag.
28 ex)
29 a constraints tag of a CASA task XML:
31 <constraints>
32 <when param="timebin">
33 <notequals type="string" value="">
34 <default param="timespan"><value type="string"/></default>
35 </notequals>
36 </when>
37 <when param="fitmode">
38 <equals value="list">
39 <default param="nfit"><value type="vector"><value>0</value></value></default>
40 </equals>
41 <equals value="auto">
42 <default param="thresh"><value>5.0</value></default>
43 <default param="avg_limit"><value>4</value></default>
44 <default param="minwidth"><value>4</value></default>
45 <default param="edge"><value type="vector"><value>0</value></value></default>
46 </equals>
47 <equals value="interact">
48 <default param="nfit"><value type="vector"><value>0</value></value></default>
49 </equals>
50 </when>
51 </constraints>
53 generated Python function code from the above XML:
55 def override_args(_a, _d, _s): # _a: position args based on *args
56 # _d: dict[key: position name, val: corresponding position index of a key]
57 # to use to get a position index of args by position name
58 # _s: boolean array, it is the same length as the position args,
59 # and the positions of user-supplied arguments are set to True
60 if _d.get('timebin') is not None and _a[_d['timebin']] != '':
61 if _d.get('timespan') is not None and _s[_d['timespan']] is False and _a[_d['timespan']] == "":
62 _a[_d['timespan']] = ''
63 casatasks.casalog.post("overrode argument: timespan -> ''", "INFO")
64 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'list':
65 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "":
66 _a[_d['nfit']] = [0]
67 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO")
68 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'auto':
69 if _d.get('thresh') is not None and _s[_d['thresh']] is False and _a[_d['thresh']] == "":
70 _a[_d['thresh']] = 5.0
71 casatasks.casalog.post("overrode argument: thresh -> 5.0", "INFO")
72 if _d.get('avg_limit') is not None and _s[_d['avg_limit']] is False and _a[_d['avg_limit']] == "":
73 _a[_d['avg_limit']] = 4
74 casatasks.casalog.post("overrode argument: avg_limit -> 4", "INFO")
75 if _d.get('minwidth') is not None and _s[_d['minwidth']] is False and _a[_d['minwidth']] == "":
76 _a[_d['minwidth']] = 4
77 casatasks.casalog.post("overrode argument: minwidth -> 4", "INFO")
78 if _d.get('edge') is not None and _s[_d['edge']] is False and _a[_d['edge']] == "":
79 _a[_d['edge']] = [0]
80 casatasks.casalog.post("overrode argument: edge -> [0]", "INFO")
81 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'interact':
82 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "":
83 _a[_d['nfit']] = [0]
84 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO")
86 Note: handling of <kwarg/> tag of task XML files
87 Subparameters whose default value is the empty string '' - but where the empty string means in fact that
88 the real default value must be set to some non-empty string - require special care. One must be able to
89 determine whether the empty string was user-supplied or not.
90 To make this determination possible, the <kwarg/> tag must be set in the <param> tag definition of such
91 parameters in the task XML file. This is currently the case only for parameter 'intent' of task sdcal,
92 where intent='' means intent='all'. See sdcal.xml.
95 Parameters
96 ----------
97 func : function
98 The casatask function to be decorated
100 Returns
101 -------
102 wrapper: function
103 A decorated casatask satisfying the XML constraints
104 """
105 @functools.wraps(func)
106 def wrapper(*args, **kwargs):
107 retval = None
108 # Any errors are handled outside the task.
109 # however, the implementation below is effectively
110 # equivalent to handling it inside the task.
111 funcname = func.__name__
113 # load the function name and arguments which is wrapped the decorator
114 # get an object reference to read informantion of argument
115 func_ = func.__dict__.get('__wrapped__', func)
117 is_recursive_load = False
118 is_called_from_casatasks = False
119 for frame_info in inspect.stack():
120 if frame_info.function == func_.__name__:
121 # when the task is called from the same task (ex: sdcal with two calmodes calls itself)
122 is_recursive_load = True
123 if frame_info.function == '__call__' and \
124 frame_info.frame.f_locals['self'].__module__ == 'casatasks.' + func_.__name__:
125 # check whether the function is called from casatasks or not.
126 is_called_from_casatasks = True
128 if is_recursive_load:
129 casatasks.casalog.post('recursive task call', 'INFO')
130 retval = func(*args, **kwargs)
131 elif not is_called_from_casatasks:
132 retval = func(*args, **kwargs)
133 else:
134 # generate the argument specification and the injector method from a task xml
135 args_, args_position_dict, converter_function_string = __load_xml(funcname)
137 # Note: the length of args is reduced by the length of kwargs.
138 for i in range(len(args)):
139 args_[i] = args[i]
140 supplied_args_flags = [False] * len(args_position_dict)
142 kwargs_ = dict()
143 for k, v in kwargs.items():
144 if args_position_dict.get(k) is not None:
145 args_[args_position_dict[k]] = v
146 supplied_args_flags[args_position_dict[k]] = True
147 else:
148 kwargs_[k] = v
150 if __DEBUG:
151 print(converter_function_string)
152 pprint(args_position_dict)
153 pprint(args_)
155 # override args by the converter generated from xml
156 casatasks.casalog.post('loaded constraints from XML', 'DEBUG')
157 _local = {}
158 exec(converter_function_string, globals(), _local)
159 _local[__FUNCTION](args_, args_position_dict, supplied_args_flags)
161 # execute task
162 retval = func(*args_, **kwargs_)
164 return retval
165 return wrapper
168def __get_taskxmlfilepath(task):
169 xmlpath = os.path.abspath(casatasks.__path__[0]) + '/__xml__'
170 taskxmlfile = f'{xmlpath}/{task}.xml'
171 if not os.path.isfile(taskxmlfile):
172 raise ValueError
173 if not os.access(taskxmlfile, os.R_OK):
174 return PermissionError
175 return taskxmlfile
178def __load_xml(task):
179 taskxml = __get_taskxmlfilepath(task)
181 stmt = []
182 dom = minidom.parse(taskxml)
183 constraints = dom.getElementsByTagName('constraints')[0]
184 for s in constraints.getElementsByTagName('when'):
185 __handle_when(s, stmt)
186 args = [__generate_default_value(param) for param in dom.getElementsByTagName('param')]
187 args_position_dict = {param.getAttribute('name'): i for i, param in enumerate(dom.getElementsByTagName('param'))}
188 return args, args_position_dict, __convert_stmt_to_pycode(stmt)
191def __generate_default_value(param):
192 type_ = param.getAttribute('type')
193 value_, type_ = __handle_value(param.getElementsByTagName('value')[0], type_)
194 if type_ == 'int':
195 return int(value_)
196 elif type_ == 'double':
197 return float(value_)
198 elif type_ == 'bool':
199 return value_ == 'True'
200 return value_
203def __convert_stmt_to_pycode(stmt_list):
204 ret = f'def {__FUNCTION}({__ARGS}, {__ARGS_DICT}, {__ARGS_SUPPLIED}):\n'
205 if len(stmt_list) > 0:
206 for [stmt, indent] in stmt_list:
207 ret += __indent(indent) + stmt + '\n'
208 else:
209 ret += __indent(1) + 'pass\n'
210 return ret
213""" constants and methods for converting from XML tree to Python code """
214__QUOTE = '\''
215__OP_EQUALS = '=='
216__OP_ASSIGN = '='
217__OP_IS = 'is'
218__OP_NOT_EQUAL = '!='
219__OP_AND = 'and'
220__OP_NOT = 'not'
221__NONE = 'None'
224def __handle_when(when, stmt):
225 # <when>
226 for node in when.childNodes:
227 if node.nodeName == 'equals':
228 __handle_equals_or_not_equals(when, node, stmt, __OP_EQUALS)
229 elif node.nodeName == 'notequals':
230 __handle_equals_or_not_equals(when, node, stmt, __OP_NOT_EQUAL)
233def __handle_equals_or_not_equals(when, elem, stmt, operator):
234 # <equals> or <notequals>
235 indent_level = 1
236 defaults = elem.getElementsByTagName('default')
237 if len(defaults) > 0:
238 stmt.append([__when(__get_param(when), operator, __get_value(elem)), indent_level])
239 for default_ in defaults:
240 left = __get_param(default_)
241 right = default_.getElementsByTagName('value')[0]
242 __handle_default(left, right, stmt, indent_level)
245def __handle_default(left, right, stmt, indent_level):
246 # <default>
247 quote = ''
248 right, type_ = __handle_value(right)
249 if type_ == 'string' or type_ == 'record' or type_ == 'stringVec':
250 quote = __QUOTE
251 if type_.endswith('Vec') or type_ == 'vector':
252 if isinstance(right, list):
253 right = ','.join([f'{quote}{r}{quote}' for r in right])
254 right = f'[{right}]'
255 else:
256 right = f'{quote}{right}{quote}'
257 if_ = __if(
258 __and(__can_get(__ARGS_DICT, left),
259 __and(
260 __is(__list(__ARGS_SUPPLIED, __dict(__ARGS_DICT, left)), False),
261 __equals(__list(__ARGS, __dict(__ARGS_DICT, left)), '""'))
262 )
263 )
264 stmt.append([if_, indent_level + 1])
265 stmt.append([__assign(__list(__ARGS, __dict(__ARGS_DICT, left)), right), indent_level + 2])
266 stmt.append([__casalog(left, right), indent_level + 2])
269def __handle_value(_element, type_='int'):
270 # <value>, it contains <value> tags or a value
271 if _element.nodeName == 'value':
272 if _element.hasAttribute('type'):
273 type_ = _element.getAttribute('type')
274 values = _element.getElementsByTagName('value')
275 if len(values) > 0:
276 return [__handle_value(v, type_)[0] for v in values], type_
277 if _element.firstChild:
278 return __handle_value(_element.firstChild, type_)
279 elif hasattr(_element, 'data'):
280 return _element.data, type_
281 return '', type_
284def __get_param(doc):
285 return __get_attr(doc, 'param')
288def __get_value(doc):
289 if doc.hasAttribute('type') and doc.getAttribute('type') == 'vector':
290 return __get_value(doc.firstChild)
291 return __get_attr(doc, 'value')
294def __get_attr(doc, param):
295 s = doc.getAttribute(param)
296 if s == '' or s:
297 return s
298 raise Exception('XML Parse Error')
301def __when(left, operator, right):
302 if ',' in right:
303 right_ = ','.join(sorted([s.strip() for s in right.split(',')]))
304 left_ = f"','.join(sorted([s.strip() for s in {__list(__ARGS, __dict(__ARGS_DICT, left))}.split(',')]))"
305 else:
306 right_ = right
307 left_ = f'{__ARGS}[{__ARGS_DICT}[{__QUOTE}{left}{__QUOTE}]]'
308 right_ = f'{__QUOTE}{right_}{__QUOTE}'
310 return __if(__and(__can_get(__ARGS_DICT, left), __exp(left_, operator, right_)))
313def __and(left, right):
314 return __exp(left, __OP_AND, right)
317def __assign(left, right):
318 return __exp(left, __OP_ASSIGN, right)
321def __is(left, right):
322 return __exp(left, __OP_IS, right)
325def __equals(left, right):
326 return __exp(left, __OP_EQUALS, right)
329def __exp(left, operator, right):
330 return f'{left} {operator} {right}'
333def __not(right):
334 return f'{__OP_NOT} {right}'
337def __if(exp):
338 return f'if {exp}:'
341def __get(val, operand, exp=None):
342 if exp:
343 return f'{val}.get({__QUOTE}{operand}{__QUOTE}, {exp})'
344 return f'{val}.get({__QUOTE}{operand}{__QUOTE})'
347def __can_get(val, operand):
348 return __is(__get(val, operand), __not(__NONE))
351def __casalog(left, right):
352 return f'casatasks.casalog.post("overrode argument: {left} -> {right}", "{__LOGLEVEL_IN_FUNCTION}")'
355def __dict(val, pos):
356 return f'{val}[{__QUOTE}{pos}{__QUOTE}]'
359def __list(val, pos):
360 return f'{val}[{pos}]'
363def __indent(level):
364 return ' ' * 4 * level
367if __name__ == '__main__':
369 @xml_constraints_injector
370 def sdcal(infile=None, calmode='tsys', fraction='10%', noff=-1,
371 width=0.5, elongated=False, applytable='', interp='', spwmap={},
372 outfile='', overwrite=False, field='', spw='', scan='', intent=''):
373 print(calmode)
374 print(fraction)
375 print(intent)
377 class _sdcal_py:
379 def __call__(self, infile=None, calmode='tsys', fraction='10%', noff=-1,
380 width=0.5, elongated=False, applytable='', interp='', spwmap={},
381 outfile='', overwrite=False, field='', spw='', scan='', **kwargs):
382 sdcal(infile, calmode, fraction, noff, width, elongated, applytable, interp, spwmap,
383 outfile, overwrite, field, spw, scan, **kwargs)
385 # Note: on L124 "frame_info.frame.f_locals['self'].__module__" is '__main__' when below lines are executed,
386 # so we cannot see the behavior of constraints in __main__ for now.
387 # If you want to see it, replace the conditional expression to be True temporarily.
388 x_sdcal = _sdcal_py()
389 x_sdcal('test', calmode='otfraster,apply')