Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/casaxmlutil.py: 87%

194 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import functools 

3import inspect 

4from xml.dom import minidom 

5 

6import casatasks 

7 

8# constants for generating converter method 

9__FUNCTION = 'override_args' 

10__ARGS = '_a' 

11__ARGS_DICT = '_d' 

12__ARGS_SUPPLIED = '_s' 

13__LOGLEVEL_IN_FUNCTION = 'INFO' 

14 

15__DEBUG = False 

16if __DEBUG: 

17 from pprint import pprint 

18 

19 

20def xml_constraints_injector(func): 

21 """Decorator which loads constraints from a casatask XML file and apply them to the arguments of the decorated casatask. 

22 

23 This method is designed as decorator for task methods. It executes as below: 

24 1. converts a constraints element of a CASA task XML to a Python code. 

25 2. evaluates the code, then a Python function is generated. 

26 3. executes the function and overrides task arguments to values defined by constraints tag. 

27 

28 ex) 

29 a constraints tag of a CASA task XML: 

30 

31 <constraints> 

32 <when param="timebin"> 

33 <notequals type="string" value=""> 

34 <default param="timespan"><value type="string"/></default> 

35 </notequals> 

36 </when> 

37 <when param="fitmode"> 

38 <equals value="list"> 

39 <default param="nfit"><value type="vector"><value>0</value></value></default> 

40 </equals> 

41 <equals value="auto"> 

42 <default param="thresh"><value>5.0</value></default> 

43 <default param="avg_limit"><value>4</value></default> 

44 <default param="minwidth"><value>4</value></default> 

45 <default param="edge"><value type="vector"><value>0</value></value></default> 

46 </equals> 

47 <equals value="interact"> 

48 <default param="nfit"><value type="vector"><value>0</value></value></default> 

49 </equals> 

50 </when> 

51 </constraints> 

52 

53 generated Python function code from the above XML: 

54 

55 def override_args(_a, _d, _s): # _a: position args based on *args 

56 # _d: dict[key: position name, val: corresponding position index of a key] 

57 # to use to get a position index of args by position name 

58 # _s: boolean array, it is the same length as the position args, 

59 # and the positions of user-supplied arguments are set to True 

60 if _d.get('timebin') is not None and _a[_d['timebin']] != '': 

61 if _d.get('timespan') is not None and _s[_d['timespan']] is False and _a[_d['timespan']] == "": 

62 _a[_d['timespan']] = '' 

63 casatasks.casalog.post("overrode argument: timespan -> ''", "INFO") 

64 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'list': 

65 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "": 

66 _a[_d['nfit']] = [0] 

67 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO") 

68 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'auto': 

69 if _d.get('thresh') is not None and _s[_d['thresh']] is False and _a[_d['thresh']] == "": 

70 _a[_d['thresh']] = 5.0 

71 casatasks.casalog.post("overrode argument: thresh -> 5.0", "INFO") 

72 if _d.get('avg_limit') is not None and _s[_d['avg_limit']] is False and _a[_d['avg_limit']] == "": 

73 _a[_d['avg_limit']] = 4 

74 casatasks.casalog.post("overrode argument: avg_limit -> 4", "INFO") 

75 if _d.get('minwidth') is not None and _s[_d['minwidth']] is False and _a[_d['minwidth']] == "": 

76 _a[_d['minwidth']] = 4 

77 casatasks.casalog.post("overrode argument: minwidth -> 4", "INFO") 

78 if _d.get('edge') is not None and _s[_d['edge']] is False and _a[_d['edge']] == "": 

79 _a[_d['edge']] = [0] 

80 casatasks.casalog.post("overrode argument: edge -> [0]", "INFO") 

81 if _d.get('fitmode') is not None and _a[_d['fitmode']] == 'interact': 

82 if _d.get('nfit') is not None and _s[_d['nfit']] is False and _a[_d['nfit']] == "": 

83 _a[_d['nfit']] = [0] 

84 casatasks.casalog.post("overrode argument: nfit -> [0]", "INFO") 

85 

86 Note: handling of <kwarg/> tag of task XML files 

87 Subparameters whose default value is the empty string '' - but where the empty string means in fact that  

88 the real default value must be set to some non-empty string - require special care. One must be able to 

89 determine whether the empty string was user-supplied or not. 

90 To make this determination possible, the <kwarg/> tag must be set in the <param> tag definition of such 

91 parameters in the task XML file. This is currently the case only for parameter 'intent' of task sdcal, 

92 where intent='' means intent='all'. See sdcal.xml. 

93 

94 

95 Parameters 

96 ---------- 

97 func : function 

98 The casatask function to be decorated 

99 

100 Returns 

101 ------- 

102 wrapper: function 

103 A decorated casatask satisfying the XML constraints 

104 """ 

105 @functools.wraps(func) 

106 def wrapper(*args, **kwargs): 

107 retval = None 

108 # Any errors are handled outside the task. 

109 # however, the implementation below is effectively 

110 # equivalent to handling it inside the task. 

111 funcname = func.__name__ 

112 

113 # load the function name and arguments which is wrapped the decorator 

114 # get an object reference to read informantion of argument 

115 func_ = func.__dict__.get('__wrapped__', func) 

116 

117 is_recursive_load = False 

118 is_called_from_casatasks = False 

119 for frame_info in inspect.stack(): 

120 if frame_info.function == func_.__name__: 

121 # when the task is called from the same task (ex: sdcal with two calmodes calls itself) 

122 is_recursive_load = True 

123 if frame_info.function == '__call__' and \ 

124 frame_info.frame.f_locals['self'].__module__ == 'casatasks.' + func_.__name__: 

125 # check whether the function is called from casatasks or not. 

126 is_called_from_casatasks = True 

127 

128 if is_recursive_load: 

129 casatasks.casalog.post('recursive task call', 'INFO') 

130 retval = func(*args, **kwargs) 

131 elif not is_called_from_casatasks: 

132 retval = func(*args, **kwargs) 

133 else: 

134 # generate the argument specification and the injector method from a task xml 

135 args_, args_position_dict, converter_function_string = __load_xml(funcname) 

136 

137 # Note: the length of args is reduced by the length of kwargs. 

138 for i in range(len(args)): 

139 args_[i] = args[i] 

140 supplied_args_flags = [False] * len(args_position_dict) 

141 

142 kwargs_ = dict() 

143 for k, v in kwargs.items(): 

144 if args_position_dict.get(k) is not None: 

145 args_[args_position_dict[k]] = v 

146 supplied_args_flags[args_position_dict[k]] = True 

147 else: 

148 kwargs_[k] = v 

149 

150 if __DEBUG: 

151 print(converter_function_string) 

152 pprint(args_position_dict) 

153 pprint(args_) 

154 

155 # override args by the converter generated from xml 

156 casatasks.casalog.post('loaded constraints from XML', 'DEBUG') 

157 exec(converter_function_string) 

158 exec(f'{__FUNCTION}(args_, args_position_dict, supplied_args_flags)') 

159 

160 # execute task 

161 retval = func(*args_, **kwargs_) 

162 

163 return retval 

164 return wrapper 

165 

166 

167def __get_taskxmlfilepath(task): 

168 xmlpath = os.path.abspath(casatasks.__path__[0]) + '/__xml__' 

169 taskxmlfile = f'{xmlpath}/{task}.xml' 

170 if not os.path.isfile(taskxmlfile): 

171 raise ValueError 

172 if not os.access(taskxmlfile, os.R_OK): 

173 return PermissionError 

174 return taskxmlfile 

175 

176 

177def __load_xml(task): 

178 taskxml = __get_taskxmlfilepath(task) 

179 

180 stmt = [] 

181 dom = minidom.parse(taskxml) 

182 constraints = dom.getElementsByTagName('constraints')[0] 

183 for s in constraints.getElementsByTagName('when'): 

184 __handle_when(s, stmt) 

185 args = [__generate_default_value(param) for param in dom.getElementsByTagName('param')] 

186 args_position_dict = {param.getAttribute('name'): i for i, param in enumerate(dom.getElementsByTagName('param'))} 

187 return args, args_position_dict, __convert_stmt_to_pycode(stmt) 

188 

189 

190def __generate_default_value(param): 

191 type_ = param.getAttribute('type') 

192 value_, type_ = __handle_value(param.getElementsByTagName('value')[0], type_) 

193 if type_ == 'int': 

194 return int(value_) 

195 elif type_ == 'double': 

196 return float(value_) 

197 elif type_ == 'bool': 

198 return value_ == 'True' 

199 return value_ 

200 

201 

202def __convert_stmt_to_pycode(stmt_list): 

203 ret = f'def {__FUNCTION}({__ARGS}, {__ARGS_DICT}, {__ARGS_SUPPLIED}):\n' 

204 if len(stmt_list) > 0: 

205 for [stmt, indent] in stmt_list: 

206 ret += __indent(indent) + stmt + '\n' 

207 else: 

208 ret += __indent(1) + 'pass\n' 

209 return ret 

210 

211 

212""" constants and methods for converting from XML tree to Python code """ 

213__QUOTE = '\'' 

214__OP_EQUALS = '==' 

215__OP_ASSIGN = '=' 

216__OP_IS = 'is' 

217__OP_NOT_EQUAL = '!=' 

218__OP_AND = 'and' 

219__OP_NOT = 'not' 

220__NONE = 'None' 

221 

222 

223def __handle_when(when, stmt): 

224 # <when> 

225 for node in when.childNodes: 

226 if node.nodeName == 'equals': 

227 __handle_equals_or_not_equals(when, node, stmt, __OP_EQUALS) 

228 elif node.nodeName == 'notequals': 

229 __handle_equals_or_not_equals(when, node, stmt, __OP_NOT_EQUAL) 

230 

231 

232def __handle_equals_or_not_equals(when, elem, stmt, operator): 

233 # <equals> or <notequals> 

234 indent_level = 1 

235 defaults = elem.getElementsByTagName('default') 

236 if len(defaults) > 0: 

237 stmt.append([__when(__get_param(when), operator, __get_value(elem)), indent_level]) 

238 for default_ in defaults: 

239 left = __get_param(default_) 

240 right = default_.getElementsByTagName('value')[0] 

241 __handle_default(left, right, stmt, indent_level) 

242 

243 

244def __handle_default(left, right, stmt, indent_level): 

245 # <default> 

246 quote = '' 

247 right, type_ = __handle_value(right) 

248 if type_ == 'string' or type_ == 'record' or type_ == 'stringVec': 

249 quote = __QUOTE 

250 if type_.endswith('Vec') or type_ == 'vector': 

251 if isinstance(right, list): 

252 right = ','.join([f'{quote}{r}{quote}' for r in right]) 

253 right = f'[{right}]' 

254 else: 

255 right = f'{quote}{right}{quote}' 

256 if_ = __if( 

257 __and(__can_get(__ARGS_DICT, left), 

258 __and( 

259 __is(__list(__ARGS_SUPPLIED, __dict(__ARGS_DICT, left)), False), 

260 __equals(__list(__ARGS, __dict(__ARGS_DICT, left)), '""')) 

261 ) 

262 ) 

263 stmt.append([if_, indent_level + 1]) 

264 stmt.append([__assign(__list(__ARGS, __dict(__ARGS_DICT, left)), right), indent_level + 2]) 

265 stmt.append([__casalog(left, right), indent_level + 2]) 

266 

267 

268def __handle_value(_element, type_='int'): 

269 # <value>, it contains <value> tags or a value 

270 if _element.nodeName == 'value': 

271 if _element.hasAttribute('type'): 

272 type_ = _element.getAttribute('type') 

273 values = _element.getElementsByTagName('value') 

274 if len(values) > 0: 

275 return [__handle_value(v, type_)[0] for v in values], type_ 

276 if _element.firstChild: 

277 return __handle_value(_element.firstChild, type_) 

278 elif hasattr(_element, 'data'): 

279 return _element.data, type_ 

280 return '', type_ 

281 

282 

283def __get_param(doc): 

284 return __get_attr(doc, 'param') 

285 

286 

287def __get_value(doc): 

288 if doc.hasAttribute('type') and doc.getAttribute('type') == 'vector': 

289 return __get_value(doc.firstChild) 

290 return __get_attr(doc, 'value') 

291 

292 

293def __get_attr(doc, param): 

294 s = doc.getAttribute(param) 

295 if s == '' or s: 

296 return s 

297 raise Exception('XML Parse Error') 

298 

299 

300def __when(left, operator, right): 

301 if ',' in right: 

302 right_ = ','.join(sorted([s.strip() for s in right.split(',')])) 

303 left_ = f"','.join(sorted([s.strip() for s in {__list(__ARGS, __dict(__ARGS_DICT, left))}.split(',')]))" 

304 else: 

305 right_ = right 

306 left_ = f'{__ARGS}[{__ARGS_DICT}[{__QUOTE}{left}{__QUOTE}]]' 

307 right_ = f'{__QUOTE}{right_}{__QUOTE}' 

308 

309 return __if(__and(__can_get(__ARGS_DICT, left), __exp(left_, operator, right_))) 

310 

311 

312def __and(left, right): 

313 return __exp(left, __OP_AND, right) 

314 

315 

316def __assign(left, right): 

317 return __exp(left, __OP_ASSIGN, right) 

318 

319 

320def __is(left, right): 

321 return __exp(left, __OP_IS, right) 

322 

323 

324def __equals(left, right): 

325 return __exp(left, __OP_EQUALS, right) 

326 

327 

328def __exp(left, operator, right): 

329 return f'{left} {operator} {right}' 

330 

331 

332def __not(right): 

333 return f'{__OP_NOT} {right}' 

334 

335 

336def __if(exp): 

337 return f'if {exp}:' 

338 

339 

340def __get(val, operand, exp=None): 

341 if exp: 

342 return f'{val}.get({__QUOTE}{operand}{__QUOTE}, {exp})' 

343 return f'{val}.get({__QUOTE}{operand}{__QUOTE})' 

344 

345 

346def __can_get(val, operand): 

347 return __is(__get(val, operand), __not(__NONE)) 

348 

349 

350def __casalog(left, right): 

351 return f'casatasks.casalog.post("overrode argument: {left} -> {right}", "{__LOGLEVEL_IN_FUNCTION}")' 

352 

353 

354def __dict(val, pos): 

355 return f'{val}[{__QUOTE}{pos}{__QUOTE}]' 

356 

357 

358def __list(val, pos): 

359 return f'{val}[{pos}]' 

360 

361 

362def __indent(level): 

363 return ' ' * 4 * level 

364 

365 

366if __name__ == '__main__': 

367 

368 @xml_constraints_injector 

369 def sdcal(infile=None, calmode='tsys', fraction='10%', noff=-1, 

370 width=0.5, elongated=False, applytable='', interp='', spwmap={}, 

371 outfile='', overwrite=False, field='', spw='', scan='', intent=''): 

372 print(calmode) 

373 print(fraction) 

374 print(intent) 

375 

376 class _sdcal_py: 

377 

378 def __call__(self, infile=None, calmode='tsys', fraction='10%', noff=-1, 

379 width=0.5, elongated=False, applytable='', interp='', spwmap={}, 

380 outfile='', overwrite=False, field='', spw='', scan='', **kwargs): 

381 sdcal(infile, calmode, fraction, noff, width, elongated, applytable, interp, spwmap, 

382 outfile, overwrite, field, spw, scan, **kwargs) 

383 

384 # Note: on L124 "frame_info.frame.f_locals['self'].__module__" is '__main__' when below lines are executed, 

385 # so we cannot see the behavior of constraints in __main__ for now. 

386 # If you want to see it, replace the conditional expression to be True temporarily. 

387 x_sdcal = _sdcal_py() 

388 x_sdcal('test', calmode='otfraster,apply')