Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/casaxmlutil.py: 87%
194 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1import os
2import functools
3import inspect
4from xml.dom import minidom
6import casatasks
8# constants for generating converter method
9__FUNCTION = 'override_args'
10__ARGS = '_a'
11__ARGS_DICT = '_d'
12__ARGS_SUPPLIED = '_s'
13__LOGLEVEL_IN_FUNCTION = 'INFO'
15__DEBUG = False
16if __DEBUG:
17 from pprint import pprint
20def xml_constraints_injector(func):
21 """Decorator which loads constraints from a casatask XML file and apply them to the arguments of the decorated casatask.
23 This method is designed as decorator for task methods. It executes as below:
24 1. converts a constraints element of a CASA task XML to a Python code.
25 2. evaluates the code, then a Python function is generated.
26 3. executes the function and overrides task arguments to values defined by constraints tag.
28 ex)
29 a constraints tag of a CASA task XML:
31 <constraints>
32 <when param="timebin">
33 <notequals type="string" value="">
34 <default param="timespan"><value type="string"/></default>
35 </notequals>
36 </when>
37 <when param="fitmode">
38 <equals value="list">
39 <default param="nfit"><value type="vector"><value>0</value></value></default>
40 </equals>
41 <equals value="auto">
42 <default param="thresh"><value>5.0</value></default>
43 <default param="avg_limit"><value>4</value></default>
44 <default param="minwidth"><value>4</value></default>
45 <default param="edge"><value type="vector"><value>0</value></value></default>
46 </equals>
47 <equals value="interact">
48 <default param="nfit"><value type="vector"><value>0</value></value></default>
49 </equals>
50 </when>
51 </constraints>
53 generated Python function code from the above XML:
55 def override_args(_a, _d, _s): # _a: position args based on *args
56 # _d: dict[key: position name, val: corresponding position index of a key]
57 # to use to get a position index of args by position name
58 # _s: boolean array, it is the same length as the position args,
59 # and the positions of user-supplied arguments are set to True
60 if _d.get('timebin') is not None and _a[_d['timebin']] != '':
61 if _d.get('timespan') is not None and _s[_d['timespan']] is False and _a[_d['timespan']] == "":
62 _a[_d['timespan']] = ''
63 casatasks.casalog.post("overrode argument: timespan -> ''", "INFO")
64 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'list':
65 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "":
66 _a[_d['nfit']] = [0]
67 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO")
68 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'auto':
69 if _d.get('thresh') is not None and _s[_d['thresh']] is False and _a[_d['thresh']] == "":
70 _a[_d['thresh']] = 5.0
71 casatasks.casalog.post("overrode argument: thresh -> 5.0", "INFO")
72 if _d.get('avg_limit') is not None and _s[_d['avg_limit']] is False and _a[_d['avg_limit']] == "":
73 _a[_d['avg_limit']] = 4
74 casatasks.casalog.post("overrode argument: avg_limit -> 4", "INFO")
75 if _d.get('minwidth') is not None and _s[_d['minwidth']] is False and _a[_d['minwidth']] == "":
76 _a[_d['minwidth']] = 4
77 casatasks.casalog.post("overrode argument: minwidth -> 4", "INFO")
78 if _d.get('edge') is not None and _s[_d['edge']] is False and _a[_d['edge']] == "":
79 _a[_d['edge']] = [0]
80 casatasks.casalog.post("overrode argument: edge -> [0]", "INFO")
81 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'interact':
82 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "":
83 _a[_d['nfit']] = [0]
84 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO")
86 Note: handling of <kwarg/> tag of task XML files
87 Subparameters whose default value is the empty string '' - but where the empty string means in fact that
88 the real default value must be set to some non-empty string - require special care. One must be able to
89 determine whether the empty string was user-supplied or not.
90 To make this determination possible, the <kwarg/> tag must be set in the <param> tag definition of such
91 parameters in the task XML file. This is currently the case only for parameter 'intent' of task sdcal,
92 where intent='' means intent='all'. See sdcal.xml.
95 Parameters
96 ----------
97 func : function
98 The casatask function to be decorated
100 Returns
101 -------
102 wrapper: function
103 A decorated casatask satisfying the XML constraints
104 """
105 @functools.wraps(func)
106 def wrapper(*args, **kwargs):
107 retval = None
108 # Any errors are handled outside the task.
109 # however, the implementation below is effectively
110 # equivalent to handling it inside the task.
111 funcname = func.__name__
113 # load the function name and arguments which is wrapped the decorator
114 # get an object reference to read informantion of argument
115 func_ = func.__dict__.get('__wrapped__', func)
117 is_recursive_load = False
118 is_called_from_casatasks = False
119 for frame_info in inspect.stack():
120 if frame_info.function == func_.__name__:
121 # when the task is called from the same task (ex: sdcal with two calmodes calls itself)
122 is_recursive_load = True
123 if frame_info.function == '__call__' and \
124 frame_info.frame.f_locals['self'].__module__ == 'casatasks.' + func_.__name__:
125 # check whether the function is called from casatasks or not.
126 is_called_from_casatasks = True
128 if is_recursive_load:
129 casatasks.casalog.post('recursive task call', 'INFO')
130 retval = func(*args, **kwargs)
131 elif not is_called_from_casatasks:
132 retval = func(*args, **kwargs)
133 else:
134 # generate the argument specification and the injector method from a task xml
135 args_, args_position_dict, converter_function_string = __load_xml(funcname)
137 # Note: the length of args is reduced by the length of kwargs.
138 for i in range(len(args)):
139 args_[i] = args[i]
140 supplied_args_flags = [False] * len(args_position_dict)
142 kwargs_ = dict()
143 for k, v in kwargs.items():
144 if args_position_dict.get(k) is not None:
145 args_[args_position_dict[k]] = v
146 supplied_args_flags[args_position_dict[k]] = True
147 else:
148 kwargs_[k] = v
150 if __DEBUG:
151 print(converter_function_string)
152 pprint(args_position_dict)
153 pprint(args_)
155 # override args by the converter generated from xml
156 casatasks.casalog.post('loaded constraints from XML', 'DEBUG')
157 exec(converter_function_string)
158 exec(f'{__FUNCTION}(args_, args_position_dict, supplied_args_flags)')
160 # execute task
161 retval = func(*args_, **kwargs_)
163 return retval
164 return wrapper
167def __get_taskxmlfilepath(task):
168 xmlpath = os.path.abspath(casatasks.__path__[0]) + '/__xml__'
169 taskxmlfile = f'{xmlpath}/{task}.xml'
170 if not os.path.isfile(taskxmlfile):
171 raise ValueError
172 if not os.access(taskxmlfile, os.R_OK):
173 return PermissionError
174 return taskxmlfile
177def __load_xml(task):
178 taskxml = __get_taskxmlfilepath(task)
180 stmt = []
181 dom = minidom.parse(taskxml)
182 constraints = dom.getElementsByTagName('constraints')[0]
183 for s in constraints.getElementsByTagName('when'):
184 __handle_when(s, stmt)
185 args = [__generate_default_value(param) for param in dom.getElementsByTagName('param')]
186 args_position_dict = {param.getAttribute('name'): i for i, param in enumerate(dom.getElementsByTagName('param'))}
187 return args, args_position_dict, __convert_stmt_to_pycode(stmt)
190def __generate_default_value(param):
191 type_ = param.getAttribute('type')
192 value_, type_ = __handle_value(param.getElementsByTagName('value')[0], type_)
193 if type_ == 'int':
194 return int(value_)
195 elif type_ == 'double':
196 return float(value_)
197 elif type_ == 'bool':
198 return value_ == 'True'
199 return value_
202def __convert_stmt_to_pycode(stmt_list):
203 ret = f'def {__FUNCTION}({__ARGS}, {__ARGS_DICT}, {__ARGS_SUPPLIED}):\n'
204 if len(stmt_list) > 0:
205 for [stmt, indent] in stmt_list:
206 ret += __indent(indent) + stmt + '\n'
207 else:
208 ret += __indent(1) + 'pass\n'
209 return ret
212""" constants and methods for converting from XML tree to Python code """
213__QUOTE = '\''
214__OP_EQUALS = '=='
215__OP_ASSIGN = '='
216__OP_IS = 'is'
217__OP_NOT_EQUAL = '!='
218__OP_AND = 'and'
219__OP_NOT = 'not'
220__NONE = 'None'
223def __handle_when(when, stmt):
224 # <when>
225 for node in when.childNodes:
226 if node.nodeName == 'equals':
227 __handle_equals_or_not_equals(when, node, stmt, __OP_EQUALS)
228 elif node.nodeName == 'notequals':
229 __handle_equals_or_not_equals(when, node, stmt, __OP_NOT_EQUAL)
232def __handle_equals_or_not_equals(when, elem, stmt, operator):
233 # <equals> or <notequals>
234 indent_level = 1
235 defaults = elem.getElementsByTagName('default')
236 if len(defaults) > 0:
237 stmt.append([__when(__get_param(when), operator, __get_value(elem)), indent_level])
238 for default_ in defaults:
239 left = __get_param(default_)
240 right = default_.getElementsByTagName('value')[0]
241 __handle_default(left, right, stmt, indent_level)
244def __handle_default(left, right, stmt, indent_level):
245 # <default>
246 quote = ''
247 right, type_ = __handle_value(right)
248 if type_ == 'string' or type_ == 'record' or type_ == 'stringVec':
249 quote = __QUOTE
250 if type_.endswith('Vec') or type_ == 'vector':
251 if isinstance(right, list):
252 right = ','.join([f'{quote}{r}{quote}' for r in right])
253 right = f'[{right}]'
254 else:
255 right = f'{quote}{right}{quote}'
256 if_ = __if(
257 __and(__can_get(__ARGS_DICT, left),
258 __and(
259 __is(__list(__ARGS_SUPPLIED, __dict(__ARGS_DICT, left)), False),
260 __equals(__list(__ARGS, __dict(__ARGS_DICT, left)), '""'))
261 )
262 )
263 stmt.append([if_, indent_level + 1])
264 stmt.append([__assign(__list(__ARGS, __dict(__ARGS_DICT, left)), right), indent_level + 2])
265 stmt.append([__casalog(left, right), indent_level + 2])
268def __handle_value(_element, type_='int'):
269 # <value>, it contains <value> tags or a value
270 if _element.nodeName == 'value':
271 if _element.hasAttribute('type'):
272 type_ = _element.getAttribute('type')
273 values = _element.getElementsByTagName('value')
274 if len(values) > 0:
275 return [__handle_value(v, type_)[0] for v in values], type_
276 if _element.firstChild:
277 return __handle_value(_element.firstChild, type_)
278 elif hasattr(_element, 'data'):
279 return _element.data, type_
280 return '', type_
283def __get_param(doc):
284 return __get_attr(doc, 'param')
287def __get_value(doc):
288 if doc.hasAttribute('type') and doc.getAttribute('type') == 'vector':
289 return __get_value(doc.firstChild)
290 return __get_attr(doc, 'value')
293def __get_attr(doc, param):
294 s = doc.getAttribute(param)
295 if s == '' or s:
296 return s
297 raise Exception('XML Parse Error')
300def __when(left, operator, right):
301 if ',' in right:
302 right_ = ','.join(sorted([s.strip() for s in right.split(',')]))
303 left_ = f"','.join(sorted([s.strip() for s in {__list(__ARGS, __dict(__ARGS_DICT, left))}.split(',')]))"
304 else:
305 right_ = right
306 left_ = f'{__ARGS}[{__ARGS_DICT}[{__QUOTE}{left}{__QUOTE}]]'
307 right_ = f'{__QUOTE}{right_}{__QUOTE}'
309 return __if(__and(__can_get(__ARGS_DICT, left), __exp(left_, operator, right_)))
312def __and(left, right):
313 return __exp(left, __OP_AND, right)
316def __assign(left, right):
317 return __exp(left, __OP_ASSIGN, right)
320def __is(left, right):
321 return __exp(left, __OP_IS, right)
324def __equals(left, right):
325 return __exp(left, __OP_EQUALS, right)
328def __exp(left, operator, right):
329 return f'{left} {operator} {right}'
332def __not(right):
333 return f'{__OP_NOT} {right}'
336def __if(exp):
337 return f'if {exp}:'
340def __get(val, operand, exp=None):
341 if exp:
342 return f'{val}.get({__QUOTE}{operand}{__QUOTE}, {exp})'
343 return f'{val}.get({__QUOTE}{operand}{__QUOTE})'
346def __can_get(val, operand):
347 return __is(__get(val, operand), __not(__NONE))
350def __casalog(left, right):
351 return f'casatasks.casalog.post("overrode argument: {left} -> {right}", "{__LOGLEVEL_IN_FUNCTION}")'
354def __dict(val, pos):
355 return f'{val}[{__QUOTE}{pos}{__QUOTE}]'
358def __list(val, pos):
359 return f'{val}[{pos}]'
362def __indent(level):
363 return ' ' * 4 * level
366if __name__ == '__main__':
368 @xml_constraints_injector
369 def sdcal(infile=None, calmode='tsys', fraction='10%', noff=-1,
370 width=0.5, elongated=False, applytable='', interp='', spwmap={},
371 outfile='', overwrite=False, field='', spw='', scan='', intent=''):
372 print(calmode)
373 print(fraction)
374 print(intent)
376 class _sdcal_py:
378 def __call__(self, infile=None, calmode='tsys', fraction='10%', noff=-1,
379 width=0.5, elongated=False, applytable='', interp='', spwmap={},
380 outfile='', overwrite=False, field='', spw='', scan='', **kwargs):
381 sdcal(infile, calmode, fraction, noff, width, elongated, applytable, interp, spwmap,
382 outfile, overwrite, field, spw, scan, **kwargs)
384 # Note: on L124 "frame_info.frame.f_locals['self'].__module__" is '__main__' when below lines are executed,
385 # so we cannot see the behavior of constraints in __main__ for now.
386 # If you want to see it, replace the conditional expression to be True temporarily.
387 x_sdcal = _sdcal_py()
388 x_sdcal('test', calmode='otfraster,apply')