Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/task_importfitsidi.py: 6%

122 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1import os 

2import shutil 

3import numpy as np 

4 

5from casatools import ms, table 

6from casatasks import casalog 

7from .mstools import write_history 

8 

9def importfitsidi(fitsidifile,vis,constobsid=None,scanreindexgap_s=None,specframe=None,coordframe=None): 

10 """Convert FITS-IDI visibility file into a CASA visibility file (MS). 

11 

12 Keyword arguments: 

13 fitsidifile -- Name(s) of input FITS IDI file(s) 

14 default: None; example='3C273XC1.IDI' or ['3C273XC1.IDI1', '3C273XC1.IDI2'] 

15 vis -- Name of output visibility file (MS) 

16 default: None; example: vis='3C273XC1.ms' 

17  

18 constobsid -- If True a constant obs id == 0 of is given to all input files  

19 default = False (new obs id for each input file) 

20 

21 scanreindexgap_s -- if > 0., a new scan is started whenever the gap between two 

22 integrations is > the given value (seconds) or when a new field starts 

23 or when the ARRAY_ID changes. 

24 default = 0. (no reindexing) 

25 

26 specframe -- this frame will be used to set the spectral reference frame 

27 for all spectral windows in the output MS 

28 default = GEO (geocentric), other options: TOPO, LSRK, BARY 

29 NOTE: if specframe is set to TOPO, the reference location will be taken from 

30 the Observatories table in the CASA data repository for the given name of 

31 the observatory. You can edit that table and add new rows. 

32 

33 coordframe -- this frame will be used to set the reference frame 

34 for all source positions in the output MS 

35 default = '' (set based on opeoch/equinox), 

36 other options: ICRS, B1950, J2000 

37 NOTE: if the epoch/equinox is set to J2000 in the FITS-IDI file, 

38 the reference frame will be set to ICRS. To use FK5/J2000 as the 

39 reference frame, set coordframe to J2000. 

40 """ 

41 

42 #Python script 

43 

44 try: 

45 casalog.origin('importfitsidi') 

46 casalog.post("") 

47 myms = ms() 

48 mytb = table() 

49 

50 if type(specframe)==str and not specframe=='': 

51 myspecframe=specframe.upper() 

52 else: 

53 myspecframe='GEO' 

54 

55 refframes = {'REST': 0, 'LSRK': 1, 'LSRD': 2, 'BARY': 3, 'GEO': 4, 'TOPO': 5} 

56 if myspecframe not in refframes: 

57 raise ValueError('Value '+myspecframe+' of parameter specframe invalid. Possible values are REST, LSRK, LSRD, BARY, GEO, TOPO') 

58 

59 if type(coordframe)==str and not coordframe=='': 

60 mycoordframe=coordframe.upper() 

61 else: 

62 mycoordframe='' 

63 

64 if mycoordframe not in ['', 'ICRS', 'B1950', 'J2000']: 

65 raise ValueError('Value '+mycoordframe+' of parameter coordframe invalid. Possible values are ICRS, B1950, J2000') 

66 

67 if(type(fitsidifile)==str): 

68 casalog.post('### Reading file '+fitsidifile, 'INFO') 

69 myms.fromfitsidi(vis,fitsidifile) 

70 myms.close() 

71 elif(type(fitsidifile)==list): 

72 clist = fitsidifile 

73 casalog.post('### Reading file '+clist[0], 'INFO') 

74 myms.fromfitsidi(vis,clist[0]) 

75 myms.close() 

76 clist.pop(0) 

77 tname = vis+'_importfitsidi_tmp_' 

78 shutil.rmtree(tname, ignore_errors=True) 

79 for fidifile in clist: 

80 casalog.post('### Reading file '+fidifile, 'INFO') 

81 myms.fromfitsidi(tname,fidifile) 

82 myms.close() 

83 myms.open(vis, nomodify=False) 

84 myms.concatenate(msfile=tname, freqtol='', dirtol='') 

85 myms.close() 

86 shutil.rmtree(tname, ignore_errors=True) 

87 else: 

88 raise ValueError('Parameter fitsidifile should be of type str or list') 

89 

90 if (constobsid): 

91 mytb.open(vis+'/OBSERVATION', nomodify=False) 

92 nobs = mytb.nrows() 

93 cando = True 

94 if nobs>1: 

95 casalog.post('Trying to keep obsid constant == 0 for all input files', 'INFO') 

96 # check if all observations are from the same telescope; if not warn and leave as is 

97 tels = mytb.getcol('TELESCOPE_NAME') 

98 for i in range(1,nobs): 

99 if tels[i]!=tels[0]: 

100 cando = False 

101 

102 if cando: 

103 # get min and max time and write them into the first row; 

104 casalog.post('Adjusting OBSERVATION table', 'INFO') 

105 timeranges = mytb.getcol('TIME_RANGE') 

106 newmin = min(timeranges[0]) 

107 newmax = max(timeranges[1]) 

108 mytb.putcell('TIME_RANGE', 0, [newmin,newmax]) 

109 # delete the other rows 

110 mytb.removerows(list(range(1,nobs))) 

111 else: 

112 casalog.post('The input files stem from different telescopes. Need to give different obs id.', 'WARN') 

113 mytb.close() 

114 

115 if cando: 

116 # give the same obs id == 0 to the entire output MS 

117 casalog.post('Setting observation ID of all integrations to 0', 'INFO') 

118 mytb.open(vis, nomodify=False) 

119 for i in range(0, mytb.nrows()): 

120 mytb.putcell('OBSERVATION_ID', i, 0) 

121 mytb.close() 

122 

123 

124 else: # don't want constant obs id 

125 if(type(fitsidifile)==list and len(fitsidifile)>1): 

126 casalog.post('Incrementing observation ID for each input file ...', 'INFO') 

127 

128 if (scanreindexgap_s > 0.): 

129 # reindex the scan column 

130 mytb.open(vis, nomodify=False) 

131 times = mytb.getcol('TIME') 

132 fields = mytb.getcol('FIELD_ID') 

133 arrayids = mytb.getcol('ARRAY_ID') 

134 scannumbers = mytb.getcol('SCAN_NUMBER') 

135 

136 timesorted = np.argsort(np.array(times)) 

137 

138 scannumber = 0 

139 scannumber_field = len(fields) * [0] 

140 prevtime = len(fields) * [0] 

141 prevarrayid = arrayids[timesorted[0]] 

142 

143 for i in range(0,mytb.nrows()): 

144 ii = timesorted[i] 

145 timenow = times[ii] 

146 fieldnow = fields[ii] 

147 arrayidnow = arrayids[ii] 

148 if (timenow-prevtime[fieldnow] > scanreindexgap_s) \ 

149 or (arrayidnow != prevarrayid): 

150 scannumber += 1 

151 scannumber_field[fieldnow] = scannumber 

152 casalog.post("Starting new scan "+str(scannumber)+" at "+str(timenow)\ 

153 +", field "+str(fieldnow)+", array_id "+str(arrayidnow), 'INFO') 

154 scannumbers[ii] = scannumber_field[fieldnow] 

155 prevtime[fieldnow] = timenow 

156 prevarrayid = arrayidnow 

157 

158 mytb.putcol('SCAN_NUMBER', scannumbers) 

159 mytb.close() 

160 

161 if myspecframe in refframes: 

162 casalog.post('Setting reference frame for all spectral windows to '+myspecframe, 'INFO') 

163 if myspecframe == 'TOPO': 

164 casalog.post('NOTE: reference position for TOPO frame will be the observatory location', 'WARN') 

165 mytb.open(vis+'/SPECTRAL_WINDOW', nomodify=False) 

166 refcol = mytb.getcol('MEAS_FREQ_REF') 

167 refcol = [refframes[myspecframe]]*len(refcol) 

168 mytb.putcol('MEAS_FREQ_REF', refcol) 

169 mytb.close() 

170 

171 mytb.open(vis+'/FIELD', nomodify=False) 

172 for colname in ['DELAY_DIR', 'PHASE_DIR', 'REFERENCE_DIR']: 

173 kwds = mytb.getcolkeywords(colname) 

174 if not mycoordframe=='': 

175 kwds['MEASINFO']['Ref'] = mycoordframe 

176 # until casacore gets fixed to use ICRS as its default 

177 elif mycoordframe=='' and kwds['MEASINFO']['Ref'] == 'J2000': 

178 kwds['MEASINFO']['Ref'] = 'ICRS' 

179 mytb.putcolkeywords(colname, kwds) 

180 mytb.close() 

181 

182 # write history 

183 try: 

184 param_names = importfitsidi.__code__.co_varnames[:importfitsidi.__code__.co_argcount] 

185 vars = locals( ) 

186 param_vals = [vars[p] for p in param_names] 

187 write_history(myms, vis, 'importfitsidi', param_names, param_vals, casalog) 

188 

189 except Exception as instance: 

190 casalog.post("*** Error \'%s\' updating HISTORY" % instance, 'WARN') 

191 

192 finally: 

193 shutil.rmtree(vis+'_importfitsidi_tmp_', ignore_errors=True)