Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/ialib.py: 89%

72 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1 

2import os 

3from stat import S_ISDIR, ST_MODE 

4 

5from casatools import image 

6from casatools import ctsys 

7 

8from .. import casalog 

9from . import cvt 

10 

11locimage = image 

12 

13def _getvestr(): 

14 return 'version: ' + ctsys.version_info( ) 

15 

16def write_image_history(myia, tname, param_names, param_vals, myclog=None): 

17 """ 

18 Update image attached to image tool with the parameters that task tname was called with. 

19 

20 myia - attached image tool or image name (string) 

21 tname - name of the calling task. 

22 param_names - list of parameter names. 

23 param_vals - list of parameter values (in the same order as param_names). 

24 myclog - a casalog instance (optional) 

25 """ 

26 

27 param_names = cvt.as_list(param_names) 

28 param_vals = cvt.as_list(param_vals) 

29 

30 myia_is_string = type(myia) == str 

31 if myia_is_string: 

32 if not myia: 

33 # empty string 

34 return 

35 _ia = locimage() 

36 _ia.open(myia) 

37 elif not hasattr(myia, 'sethistory'): 

38 return False 

39 else: 

40 _ia = myia 

41 try: 

42 if not myclog and hasattr(casalog, 'post'): 

43 myclog = casalog 

44 except Exception as instance: 

45 # There's no logger to complain to, and I don't want to exit 

46 # just because of that. 

47 pass 

48 try: 

49 vestr = _getvestr() 

50 _ia.sethistory(tname, vestr) 

51 

52 # Write the arguments. 

53 s = tname + "(" 

54 n = len(param_names) 

55 for argnum in range(n): 

56 s += str(param_names[argnum]) + "=" 

57 val = param_vals[argnum] 

58 sval = str(val) 

59 if len(sval) > 300: 

60 s += "..." 

61 else: 

62 if type(val) == str: 

63 s += '"' 

64 s += sval 

65 if type(val) == str: 

66 s += '"' 

67 if argnum < n-1: 

68 s += ", " 

69 s += ")" 

70 _ia.sethistory(tname, s) 

71 except Exception as instance: 

72 if hasattr(myclog, 'post'): 

73 myclog.post("*** Error \"%s\" updating HISTORY of " % (instance), 

74 'SEVERE') 

75 return False 

76 finally: 

77 if myia_is_string: 

78 _ia.done() 

79 return True 

80 

81def get_created_images(outfile, target_time): 

82 dirpath = os.path.dirname(outfile) 

83 if not dirpath: 

84 dirpath = "." 

85 base = os.path.basename(outfile) 

86 # get all entries in the directory w/ stats 

87 entries = [] 

88 for fn in os.listdir(dirpath): 

89 if os.path.basename(fn).startswith(base): 

90 entries.append((os.stat(fn), fn)) 

91 # leave only directories, insert creation date 

92 entries = ((stat.st_mtime, path) 

93 for stat, path in entries if S_ISDIR(stat[ST_MODE])) 

94 # reverse sort by time 

95 zz = sorted(entries) 

96 zz.reverse() 

97 created_images = [] 

98 for mdate, path in zz: 

99 # kludge because all of a sudden, some mdates are less than time.time() value 

100 # that was gotten before these files were created on OSX. Weird. 

101 if mdate < target_time - 1: 

102 break 

103 if os.path.basename(path).startswith(base): 

104 created_images.append(path) 

105 return created_images 

106