Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/tests/test_casatasks.py: 0%
125 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-31 19:53 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-31 19:53 +0000
1#########################################################################
2# test_casatasks.py
3#
4# Copyright (C) 2018
5# Associated Universities, Inc. Washington DC, USA.
6#
7# This script is free software; you can redistribute it and/or modify it
8# under the terms of the GNU Library General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or (at your
10# option) any later version.
11#
12# This library is distributed in the hope that it will be useful, but WITHOUT
13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
15# License for more details.
16#
17# Based on the requirements listed in casadocs found in:
18# https://casadocs.readthedocs.io/en/stable/api/tt/casatasks.html
19#
20##########################################################################
21import os
22import unittest
23import numpy as np
24from casatasks import importasdm, importfits, listobs, flagdata, gencal, setjy, fluxscale
25from casatasks import gaincal, bandpass, mstransform, tclean, immoments, sdcal
26from casatestutils import sparse_check
29class BaseClass(unittest.TestCase):
30 """ Base class with helper functions """
31 def getdata(testfiles=None):
32 # Download data for the tests
33 sparse_check.download_data(testfiles)
36class CasaTasksTests(BaseClass):
37 """ Unit tests for CASA tasks
38 The test script will download test data automatically to
39 the local directory and will remove them afterwards
40 """
42 @classmethod
43 def setUpClass(cls) -> None:
44 cls.asdm = 'AutocorrASDM'
45 cls.fitsimage = 'two_gaussian_model.fits'
46 cls.listobs_ms = 'uid___X02_X3d737_X1_01_small.ms'
47 cls.flagdata_ms = 'ngc5921.ms'
48 cls.gencal_ms = 'tdem0003gencal.ms'
49 cls.setjy_ms = 'ngc5921.ms'
50 cls.fluxscale_ms = 'CalMSwithModel.ms'
51 cls.fluxscale_gtable = 'ModelGcal.G0'
52 cls.gaincal_ms = 'gaincaltest2.ms'
53 cls.split_ms = 'Four_ants_3C286.ms'
54 cls.tclean_ms = 'refim_oneshiftpoint.mosaic.ms'
55 cls.immoments_img = 'n1333_both.image'
56 cls.sdcal_ms = "otf_ephem.ms"
57 cls.input_files = [
58 cls.asdm, cls.fitsimage, cls.listobs_ms, cls.flagdata_ms,
59 cls.gencal_ms, cls.fluxscale_ms, cls.fluxscale_gtable,
60 cls.gaincal_ms, cls.split_ms, cls.tclean_ms, cls.immoments_img,
61 cls.sdcal_ms
62 ]
63 # Fetch input data
64 cls.getdata(testfiles=cls.input_files)
66 # Output data
67 cls.asdm_ms = f'{cls.asdm}.ms'
68 cls.onlineflags = 'onlineflags.txt'
69 cls.casaimage = f'{cls.fitsimage}.image'
70 cls.gentable = 'gencal_antpos.cal'
71 cls.fluxscale_out = 'fluxout.cal'
72 cls.gaincal_out = 'gaincaltable.cal'
73 cls.bandpass_out = 'bandpass.bcal'
74 cls.split_out = 'split_model.ms'
75 cls.tclean_img = 'tclean_test_'
76 cls.immoments_out = 'immoment.mom0'
77 cls.sdcal_out = 'otf_ephem.ms.otfcal'
78 cls.output_files = [
79 cls.asdm_ms, cls.onlineflags, cls.casaimage, cls.gentable,
80 cls.fluxscale_out, cls.gaincal_out, cls.bandpass_out,
81 cls.split_out, cls.immoments_out, cls.sdcal_out
82 ]
84 @classmethod
85 def tearDownClass(cls) -> None:
86 # Remove input files
87 for inpfile in cls.input_files:
88 os.system(f'rm -rf {inpfile}')
90 # Remove output files
91 for outfile in cls.output_files:
92 os.system(f'rm -rf {outfile}')
93 os.system(f'rm -rf {cls.tclean_img}*')
95 def setUp(self):
96 print(f"{self._testMethodName}: {self._testMethodDoc}")
98 def test_importasdm_autocorrelations(self):
99 """Test importasdm on autocorrelation ASDM with scan selection and saving online flags"""
100 # Use default name for output MS
101 self.assertEqual(
102 importasdm(asdm=self.asdm, scans='3', savecmds=True, outfile=self.onlineflags,
103 flagbackup=False), None)
104 self.assertTrue(os.path.exists(self.onlineflags))
105 with open(self.onlineflags, 'r') as ff:
106 cmds = ff.readlines()
107 # auto-correlation should have been written to online flags
108 self.assertTrue(cmds[0].__contains__('&&*'))
110 def test_importfits_beam(self):
111 """Test importfits on data with synthesized beam"""
112 importfits(fitsimage=self.fitsimage, imagename=self.casaimage,
113 beam=['0.35arcsec', '0.24arcsec', '25deg'])
114 self.assertTrue(os.path.exists(self.casaimage), "Output image does not exist")
116 def test_listobs_field_selection(self):
117 """Test listobs on MS with field selection"""
118 res = listobs(vis=self.listobs_ms, field='1')
119 self.assertEqual(res['nfields'], 1)
121 def test_flagdata_manual(self):
122 """Test flagdata with manual mode"""
123 flagdata(vis=self.flagdata_ms, correlation='LL', savepars=False, flagbackup=False)
124 flagdata(vis=self.flagdata_ms, spw='0:17~19', savepars=False, flagbackup=False)
125 flagdata(vis=self.flagdata_ms, antenna='VA05&&VA09', savepars=False, flagbackup=False)
126 flagdata(vis=self.flagdata_ms, field='1', savepars=False, flagbackup=False)
127 summary = flagdata(vis=self.flagdata_ms, mode='summary', minrel=0.9, spwchan=True, basecnt=True)
128 assert 'VA05&&VA09' in summary['baseline']
129 assert set(summary['spw:channel']) == {'0:17', '0:18', '0:19'}
130 assert list(summary['correlation'].keys()) == ['LL'] # LL
131 assert list(summary['field'].keys()) == ['1445+09900002_0']
132 assert set(summary['scan']) == {'2', '4', '5', '7'}
133 summary = flagdata(vis=self.flagdata_ms, mode='summary', maxrel=0.8)
134 assert set(summary['field']) == {'1331+30500002_0', 'N5921_2'}
135 summary = flagdata(vis=self.flagdata_ms, mode='summary', minabs=400000)
136 assert set(summary['scan']) == {'3', '6'}
137 summary = flagdata(vis=self.flagdata_ms, mode='summary', minabs=400000, maxabs=450000)
138 assert list(summary['scan'].keys()) == ['3']
140 def test_flagdata_quack(self):
141 """Test flagdata with quack mode"""
142 flagdata(vis=self.flagdata_ms, mode='unflag', flagbackup=False)
143 flagdata(vis=self.flagdata_ms, mode='quack', quackmode='beg', quackinterval=1,
144 flagbackup=False)
145 self.assertEqual(flagdata(vis=self.flagdata_ms, mode='summary')['flagged'], 329994)
147 def test_gencal_antpos_manual(self):
148 """Test gencal with manual antenna position correction"""
149 gencal(vis=self.gencal_ms, caltable=self.gentable, caltype='antpos',
150 antenna='ea12,ea22', parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190])
151 self.assertTrue(os.path.exists(self.gentable))
153 def test_setjy_flux_standard(self):
154 """Test setjy with input dictionary for fluxscale standard"""
155 fluxscaledict = {
156 '1':
157 {'0':
158 {'fluxd': np.array([2.48362403, 0., 0., 0.]),
159 'fluxdErr': np.array([0.00215907, 0., 0., 0.]),
160 'numSol': np.array([54., 0., 0., 0.])
161 },
162 'fieldName': '1445+09900002_0',
163 'fitFluxd': 0.0,
164 'fitFluxdErr': 0.0,
165 'fitRefFreq': 0.0,
166 'spidx': np.array([0., 0., 0.]),
167 'spidxerr': np.array([0., 0., 0.])
168 },
169 'freq': np.array([1.41266507e+09]),
170 'spwID': np.array([0], dtype=np.int32),
171 'spwName': np.array(['none'], dtype='|S5')
172 }
174 retdict = setjy(vis=self.setjy_ms, standard='fluxscale', fluxdict=fluxscaledict,
175 usescratch=False)
176 self.assertEqual(retdict['1']['0']['fluxd'][0], 2.48362403, 0.0001)
178 def test_fluxscale_refspwmap(self):
179 """Test fluxscale with a refspwmap parameter"""
180 fluxscale(vis=self.fluxscale_ms, caltable=self.fluxscale_gtable, fluxtable=self.fluxscale_out,
181 reference=['0'], refspwmap=[1, 1, 1, 1])
182 self.assertTrue(os.path.exists(self.fluxscale_out))
184 def test_gaincal_gaintype_g(self):
185 """Test gaincal using gaintype G and flagged antennas"""
186 gaincal(vis=self.gaincal_ms, caltable=self.gaincal_out, refant='0', field='0', solint='inf',
187 combine='scan', antenna='0~5&', smodel=[1, 0, 0, 0], gaintype='G')
188 self.assertTrue(os.path.exists(self.gaincal_out))
190 def test_bandpass_solint_inf(self):
191 """Test bandpass using solint=inf using a field selection"""
192 bandpass(vis=self.flagdata_ms, caltable=self.bandpass_out, field='0', uvrange='>0.0',
193 bandtype='B', solint='inf', combine='scan', refant='VA15')
194 self.assertTrue(os.path.exists(self.bandpass_out))
196 def test_mstransform_split_model_col(self):
197 """Test mstransform to split out the MODEL column"""
198 from casatools import table
199 mytb = table()
200 mytb.open(self.split_ms)
201 cols = mytb.colnames()
202 mytb.done()
203 self.assertTrue("MODEL_DATA" in cols)
204 mstransform(vis=self.split_ms, outputvis=self.split_out, field='1', spw='0:0~61',
205 datacolumn='model')
206 self.assertTrue(os.path.exists(self.split_out))
207 mytb.open(self.split_out)
208 cols = mytb.colnames()
209 mytb.done()
210 self.assertFalse("MODEL_DATA" in cols)
212 def test_tclean_mtmfs_mosaic_cbFalse_onefield(self):
213 """Test tclean with mosaic gridder and specmode mfs"""
214 tclean(vis=self.tclean_ms, imagename=self.tclean_img, niter=0, specmode='mfs', spw='*',
215 imsize=1024, phasecenter='', cell='10.0arcsec', gridder='mosaic', field='0',
216 conjbeams=False, wbawp=True, psterm=False, pblimit=0.1, deconvolver='mtmfs', nterms=2,
217 reffreq='1.5GHz', pbcor=False, parallel=False)
218 self.assertTrue(os.path.exists(f'{self.tclean_img}.image.tt0'))
219 self.assertTrue(os.path.exists(f'{self.tclean_img}.pb.tt0'))
220 self.assertTrue(os.path.exists(f'{self.tclean_img}.psf.tt0'))
221 self.assertTrue(os.path.exists(f'{self.tclean_img}.residual.tt0'))
222 self.assertTrue(os.path.exists(f'{self.tclean_img}.weight.tt0'))
224 def test_immoments_box_parameter(self):
225 """Test immoments calculation of each type of moment"""
226 immoments('n1333_both.image', moments=[0], axis='spec', chans='2~15', includepix=[0.003, 100.0],
227 excludepix=[-1], outfile=self.immoments_out)
228 self.assertTrue(os.path.exists(self.immoments_out))
230 def test_sdcal_otf_ephemeris(self):
231 """Test sdcal on-the-fly sky calibration with ephemeris object"""
232 sdcal(infile=self.sdcal_ms, outfile=self.sdcal_out, calmode='otf')
233 self.assertTrue(os.path.exists(self.sdcal_out))
236if __name__ == '__main__':
237 unittest.main()