Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_importasap.py: 92%

49 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import os 

2import re 

3 

4from casatasks import casalog 

5from casatools import agentflagger, calibrater, ms, singledishms 

6 

7from . import sdutil 

8from .mstools import write_history 

9 

10mysdms = singledishms() 

11mycb = calibrater() 

12myms = ms() 

13 

14 

15@sdutil.sdtask_decorator 

16def importasap(infile=None, outputvis=None, flagbackup=None, overwrite=None, parallel=None): 

17 """ 

18 """ 

19 

20 try: 

21 if infile is None or outputvis is None: 

22 raise RuntimeError('Error: infile and outputvis must be specified.') 

23 

24 # default value 

25 if flagbackup is None: 

26 flagbackup = True 

27 

28 if overwrite is None: 

29 overwrite = False 

30 

31 # basic check 

32 if os.path.exists(outputvis) and not overwrite: 

33 raise RuntimeError('%s exists.' % (outputvis)) 

34 

35 if not _is_scantable(infile): 

36 raise RuntimeError('%s is not a valid Scantable.' % (infile)) 

37 

38 # import 

39 status = mysdms.importasap(infile, outputvis, parallel) 

40 

41 if status: 

42 # flagversions file must be deleted 

43 flagversions = outputvis.rstrip('/') + '.flagversions' 

44 if os.path.exists(flagversions): 

45 os.system('rm -rf %s' % (flagversions)) 

46 

47 # initialize weights using cb tool 

48 mycb.open(outputvis, compress=False, addcorr=False, addmodel=False) 

49 mycb.initweights(wtmode='nyq') 

50 

51 # create flagbackup file if user requests it 

52 if flagbackup: 

53 aflocal = agentflagger() 

54 aflocal.open(outputvis) 

55 aflocal.saveflagversion( 

56 'Original', 

57 comment='Original flags at import into CASA using importasap', 

58 merge='save') 

59 aflocal.done() 

60 else: 

61 raise RuntimeError('Failure in importasap') 

62 

63 # Write history to output MS 

64 param_names = importasap.__code__.co_varnames[:importasap.__code__.co_argcount] 

65 vars = locals() 

66 param_vals = [vars[p] for p in param_names] 

67 write_history(myms, outputvis, 'importasap', param_names, 

68 param_vals, casalog) 

69 

70 finally: 

71 mycb.close() 

72 

73 

74def _is_scantable(filename): 

75 """Check if given data is Scantable or not.""" 

76 ret = False 

77 if os.path.isdir(filename) and os.path.exists(filename + '/table.info') \ 

78 and os.path.exists(filename + '/table.dat'): 

79 with open(filename + '/table.info') as f: 

80 lines = f.readline() 

81 f.close() 

82 match_pattern = '^Type = (Scantable)? *$' 

83 ret = re.match(match_pattern, lines) is not None 

84 return ret