Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_importnro.py: 85%

47 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1import datetime 

2import os 

3import sys 

4import shutil 

5 

6from casatasks import casalog 

7from casatools import calibrater, ms, singledishms 

8 

9from . import sdutil 

10from .mstools import write_history 

11 

12mysdms = singledishms() 

13mycb = calibrater() 

14myms = ms() 

15 

16 

17@sdutil.sdtask_decorator 

18def importnro(infile=None, outputvis=None, overwrite=None, parallel=None): 

19 """ 

20 """ 

21 status = True 

22 

23 try: 

24 outputvis_temp = outputvis + '-backup-' + datetime.datetime.now().strftime('%Y%m%d-%H%M%S') 

25 

26 if os.path.exists(outputvis): 

27 if overwrite: 

28 os.rename(outputvis, outputvis_temp) 

29 else: 

30 raise RuntimeError('%s exists.' % (outputvis)) 

31 

32 if not _is_nostar(infile): 

33 raise RuntimeError('%s is not a valid NOSTAR data.' % (infile)) 

34 

35 status = mysdms.importnro(infile, outputvis, parallel) 

36 

37 if status: 

38 # initialize weights using cb tool 

39 mycb.open(outputvis, compress=False, addcorr=False, addmodel=False) 

40 mycb.initweights(wtmode='nyq') 

41 if os.path.exists(outputvis_temp): 

42 shutil.rmtree(outputvis_temp) 

43 else: 

44 if os.path.exists(outputvis): 

45 shutil.rmtree(outputvis) 

46 if os.path.exists(outputvis_temp): 

47 os.rename(outputvis_temp, outputvis) 

48 raise RuntimeError('import failed.') 

49 

50 # Write parameters to HISTORY table of MS 

51 param_names = importnro.__code__.co_varnames[:importnro.__code__.co_argcount] 

52 vars = locals() 

53 param_vals = [vars[p] for p in param_names] 

54 write_history(myms, outputvis, 'importnro', param_names, 

55 param_vals, casalog) 

56 

57 finally: 

58 if status: 

59 mycb.close() 

60 

61 

62def _is_nostar(filename): 

63 """Check if given data is NOSTAR or not.""" 

64 ret = False 

65 if os.path.getsize(filename) >= 15136: # size of observation header 

66 with open(filename, 'rb') as f: 

67 if f.read(8).replace(b'\x00', b'') == b'RW': 

68 ret = (f.read(15136 - 8 + 4)[-4:].replace(b'\x00', b'').decode( ) == 'LS') 

69 f.close() 

70 return ret