Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/tests/test_casatasks.py: 0%

125 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-01 07:19 +0000

1######################################################################### 

2# test_casatasks.py 

3# 

4# Copyright (C) 2018 

5# Associated Universities, Inc. Washington DC, USA. 

6# 

7# This script is free software; you can redistribute it and/or modify it 

8# under the terms of the GNU Library General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or (at your 

10# option) any later version. 

11# 

12# This library is distributed in the hope that it will be useful, but WITHOUT 

13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 

14# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 

15# License for more details. 

16# 

17# Based on the requirements listed in casadocs found in: 

18# https://casadocs.readthedocs.io/en/stable/api/tt/casatasks.html 

19# 

20########################################################################## 

21import os 

22import unittest 

23import numpy as np 

24from casatasks import importasdm, importfits, listobs, flagdata, gencal, setjy, fluxscale 

25from casatasks import gaincal, bandpass, mstransform, tclean, immoments, sdcal 

26from casatestutils import sparse_check 

27 

28 

29class BaseClass(unittest.TestCase): 

30 """ Base class with helper functions """ 

31 def getdata(testfiles=None): 

32 # Download data for the tests 

33 sparse_check.download_data(testfiles) 

34 

35 

36class CasaTasksTests(BaseClass): 

37 """ Unit tests for CASA tasks 

38 The test script will download test data automatically to 

39 the local directory and will remove them afterwards 

40 """ 

41 

42 @classmethod 

43 def setUpClass(cls) -> None: 

44 cls.asdm = 'AutocorrASDM' 

45 cls.fitsimage = 'two_gaussian_model.fits' 

46 cls.listobs_ms = 'uid___X02_X3d737_X1_01_small.ms' 

47 cls.flagdata_ms = 'ngc5921.ms' 

48 cls.gencal_ms = 'tdem0003gencal.ms' 

49 cls.setjy_ms = 'ngc5921.ms' 

50 cls.fluxscale_ms = 'CalMSwithModel.ms' 

51 cls.fluxscale_gtable = 'ModelGcal.G0' 

52 cls.gaincal_ms = 'gaincaltest2.ms' 

53 cls.split_ms = 'Four_ants_3C286.ms' 

54 cls.tclean_ms = 'refim_oneshiftpoint.mosaic.ms' 

55 cls.immoments_img = 'n1333_both.image' 

56 cls.sdcal_ms = "otf_ephem.ms" 

57 cls.input_files = [ 

58 cls.asdm, cls.fitsimage, cls.listobs_ms, cls.flagdata_ms, 

59 cls.gencal_ms, cls.fluxscale_ms, cls.fluxscale_gtable, 

60 cls.gaincal_ms, cls.split_ms, cls.tclean_ms, cls.immoments_img, 

61 cls.sdcal_ms 

62 ] 

63 # Fetch input data 

64 cls.getdata(testfiles=cls.input_files) 

65 

66 # Output data 

67 cls.asdm_ms = f'{cls.asdm}.ms' 

68 cls.onlineflags = 'onlineflags.txt' 

69 cls.casaimage = f'{cls.fitsimage}.image' 

70 cls.gentable = 'gencal_antpos.cal' 

71 cls.fluxscale_out = 'fluxout.cal' 

72 cls.gaincal_out = 'gaincaltable.cal' 

73 cls.bandpass_out = 'bandpass.bcal' 

74 cls.split_out = 'split_model.ms' 

75 cls.tclean_img = 'tclean_test_' 

76 cls.immoments_out = 'immoment.mom0' 

77 cls.sdcal_out = 'otf_ephem.ms.otfcal' 

78 cls.output_files = [ 

79 cls.asdm_ms, cls.onlineflags, cls.casaimage, cls.gentable, 

80 cls.fluxscale_out, cls.gaincal_out, cls.bandpass_out, 

81 cls.split_out, cls.immoments_out, cls.sdcal_out 

82 ] 

83 

84 @classmethod 

85 def tearDownClass(cls) -> None: 

86 # Remove input files 

87 for inpfile in cls.input_files: 

88 os.system(f'rm -rf {inpfile}') 

89 

90 # Remove output files 

91 for outfile in cls.output_files: 

92 os.system(f'rm -rf {outfile}') 

93 os.system(f'rm -rf {cls.tclean_img}*') 

94 

95 def setUp(self): 

96 print(f"{self._testMethodName}: {self._testMethodDoc}") 

97 

98 def test_importasdm_autocorrelations(self): 

99 """Test importasdm on autocorrelation ASDM with scan selection and saving online flags""" 

100 # Use default name for output MS 

101 self.assertEqual( 

102 importasdm(asdm=self.asdm, scans='3', savecmds=True, outfile=self.onlineflags, 

103 flagbackup=False), None) 

104 self.assertTrue(os.path.exists(self.onlineflags)) 

105 with open(self.onlineflags, 'r') as ff: 

106 cmds = ff.readlines() 

107 # auto-correlation should have been written to online flags 

108 self.assertTrue(cmds[0].__contains__('&&*')) 

109 

110 def test_importfits_beam(self): 

111 """Test importfits on data with synthesized beam""" 

112 importfits(fitsimage=self.fitsimage, imagename=self.casaimage, 

113 beam=['0.35arcsec', '0.24arcsec', '25deg']) 

114 self.assertTrue(os.path.exists(self.casaimage), "Output image does not exist") 

115 

116 def test_listobs_field_selection(self): 

117 """Test listobs on MS with field selection""" 

118 res = listobs(vis=self.listobs_ms, field='1') 

119 self.assertEqual(res['nfields'], 1) 

120 

121 def test_flagdata_manual(self): 

122 """Test flagdata with manual mode""" 

123 flagdata(vis=self.flagdata_ms, correlation='LL', savepars=False, flagbackup=False) 

124 flagdata(vis=self.flagdata_ms, spw='0:17~19', savepars=False, flagbackup=False) 

125 flagdata(vis=self.flagdata_ms, antenna='VA05&&VA09', savepars=False, flagbackup=False) 

126 flagdata(vis=self.flagdata_ms, field='1', savepars=False, flagbackup=False) 

127 summary = flagdata(vis=self.flagdata_ms, mode='summary', minrel=0.9, spwchan=True, basecnt=True) 

128 assert 'VA05&&VA09' in summary['baseline'] 

129 assert set(summary['spw:channel']) == {'0:17', '0:18', '0:19'} 

130 assert list(summary['correlation'].keys()) == ['LL'] # LL 

131 assert list(summary['field'].keys()) == ['1445+09900002_0'] 

132 assert set(summary['scan']) == {'2', '4', '5', '7'} 

133 summary = flagdata(vis=self.flagdata_ms, mode='summary', maxrel=0.8) 

134 assert set(summary['field']) == {'1331+30500002_0', 'N5921_2'} 

135 summary = flagdata(vis=self.flagdata_ms, mode='summary', minabs=400000) 

136 assert set(summary['scan']) == {'3', '6'} 

137 summary = flagdata(vis=self.flagdata_ms, mode='summary', minabs=400000, maxabs=450000) 

138 assert list(summary['scan'].keys()) == ['3'] 

139 

140 def test_flagdata_quack(self): 

141 """Test flagdata with quack mode""" 

142 flagdata(vis=self.flagdata_ms, mode='unflag', flagbackup=False) 

143 flagdata(vis=self.flagdata_ms, mode='quack', quackmode='beg', quackinterval=1, 

144 flagbackup=False) 

145 self.assertEqual(flagdata(vis=self.flagdata_ms, mode='summary')['flagged'], 329994) 

146 

147 def test_gencal_antpos_manual(self): 

148 """Test gencal with manual antenna position correction""" 

149 gencal(vis=self.gencal_ms, caltable=self.gentable, caltype='antpos', 

150 antenna='ea12,ea22', parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190]) 

151 self.assertTrue(os.path.exists(self.gentable)) 

152 

153 def test_setjy_flux_standard(self): 

154 """Test setjy with input dictionary for fluxscale standard""" 

155 fluxscaledict = { 

156 '1': 

157 {'0': 

158 {'fluxd': np.array([2.48362403, 0., 0., 0.]), 

159 'fluxdErr': np.array([0.00215907, 0., 0., 0.]), 

160 'numSol': np.array([54., 0., 0., 0.]) 

161 }, 

162 'fieldName': '1445+09900002_0', 

163 'fitFluxd': 0.0, 

164 'fitFluxdErr': 0.0, 

165 'fitRefFreq': 0.0, 

166 'spidx': np.array([0., 0., 0.]), 

167 'spidxerr': np.array([0., 0., 0.]) 

168 }, 

169 'freq': np.array([1.41266507e+09]), 

170 'spwID': np.array([0], dtype=np.int32), 

171 'spwName': np.array(['none'], dtype='|S5') 

172 } 

173 

174 retdict = setjy(vis=self.setjy_ms, standard='fluxscale', fluxdict=fluxscaledict, 

175 usescratch=False) 

176 self.assertEqual(retdict['1']['0']['fluxd'][0], 2.48362403, 0.0001) 

177 

178 def test_fluxscale_refspwmap(self): 

179 """Test fluxscale with a refspwmap parameter""" 

180 fluxscale(vis=self.fluxscale_ms, caltable=self.fluxscale_gtable, fluxtable=self.fluxscale_out, 

181 reference=['0'], refspwmap=[1, 1, 1, 1]) 

182 self.assertTrue(os.path.exists(self.fluxscale_out)) 

183 

184 def test_gaincal_gaintype_g(self): 

185 """Test gaincal using gaintype G and flagged antennas""" 

186 gaincal(vis=self.gaincal_ms, caltable=self.gaincal_out, refant='0', field='0', solint='inf', 

187 combine='scan', antenna='0~5&', smodel=[1, 0, 0, 0], gaintype='G') 

188 self.assertTrue(os.path.exists(self.gaincal_out)) 

189 

190 def test_bandpass_solint_inf(self): 

191 """Test bandpass using solint=inf using a field selection""" 

192 bandpass(vis=self.flagdata_ms, caltable=self.bandpass_out, field='0', uvrange='>0.0', 

193 bandtype='B', solint='inf', combine='scan', refant='VA15') 

194 self.assertTrue(os.path.exists(self.bandpass_out)) 

195 

196 def test_mstransform_split_model_col(self): 

197 """Test mstransform to split out the MODEL column""" 

198 from casatools import table 

199 mytb = table() 

200 mytb.open(self.split_ms) 

201 cols = mytb.colnames() 

202 mytb.done() 

203 self.assertTrue("MODEL_DATA" in cols) 

204 mstransform(vis=self.split_ms, outputvis=self.split_out, field='1', spw='0:0~61', 

205 datacolumn='model') 

206 self.assertTrue(os.path.exists(self.split_out)) 

207 mytb.open(self.split_out) 

208 cols = mytb.colnames() 

209 mytb.done() 

210 self.assertFalse("MODEL_DATA" in cols) 

211 

212 def test_tclean_mtmfs_mosaic_cbFalse_onefield(self): 

213 """Test tclean with mosaic gridder and specmode mfs""" 

214 tclean(vis=self.tclean_ms, imagename=self.tclean_img, niter=0, specmode='mfs', spw='*', 

215 imsize=1024, phasecenter='', cell='10.0arcsec', gridder='mosaic', field='0', 

216 conjbeams=False, wbawp=True, psterm=False, pblimit=0.1, deconvolver='mtmfs', nterms=2, 

217 reffreq='1.5GHz', pbcor=False, parallel=False) 

218 self.assertTrue(os.path.exists(f'{self.tclean_img}.image.tt0')) 

219 self.assertTrue(os.path.exists(f'{self.tclean_img}.pb.tt0')) 

220 self.assertTrue(os.path.exists(f'{self.tclean_img}.psf.tt0')) 

221 self.assertTrue(os.path.exists(f'{self.tclean_img}.residual.tt0')) 

222 self.assertTrue(os.path.exists(f'{self.tclean_img}.weight.tt0')) 

223 

224 def test_immoments_box_parameter(self): 

225 """Test immoments calculation of each type of moment""" 

226 immoments('n1333_both.image', moments=[0], axis='spec', chans='2~15', includepix=[0.003, 100.0], 

227 excludepix=[-1], outfile=self.immoments_out) 

228 self.assertTrue(os.path.exists(self.immoments_out)) 

229 

230 def test_sdcal_otf_ephemeris(self): 

231 """Test sdcal on-the-fly sky calibration with ephemeris object""" 

232 sdcal(infile=self.sdcal_ms, outfile=self.sdcal_out, calmode='otf') 

233 self.assertTrue(os.path.exists(self.sdcal_out)) 

234 

235 

236if __name__ == '__main__': 

237 unittest.main()