Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/task_specflux.py: 85%
125 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1##########################################################################
2# task_specflux.py
3#
4# Copyright (C) 2008, 2009, 2010
5# Associated Universities, Inc. Washington DC, USA.
6#
7# This script is free software; you can redistribute it and/or modify it
8# under the terms of the GNU Library General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or (at your
10# option) any later version.
11#
12# This library is distributed in the hope that it will be useful, but WITHOUT
13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
15# License for more details.
16#
17# You should have received a copy of the GNU Library General Public License
18# along with this library; if not, write to the Free Software Foundation,
19# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
20#
21# Correspondence concerning AIPS++ should be adressed as follows:
22# Internet email: casa-feedback@nrao.edu.
23# Postal address: AIPS++ Project Office
24# National Radio Astronomy Observatory
25# 520 Edgemont Road
26# Charlottesville, VA 22903-2475 USA
27#
28# <author>
29# Dave Mehringer
30# </author>
31#
33###########################################################################
34import os.path
35import numpy
37from casatools import image, regionmanager, quanta
38from casatasks import casalog
39from .ialib import write_image_history, get_created_images
42def specflux(
43 imagename, region, box, chans, stokes, mask, stretch,
44 function, unit, major, minor, logfile, overwrite
45):
46 casalog.origin('specflux')
47 myia = image()
48 myrg = regionmanager()
49 _qa = quanta()
50 try:
51 if logfile and not overwrite and os.path.exists(logfile):
52 raise Exception(logfile + " exists and overwrite is False")
53 funclower = function.lower()
54 if not (
55 funclower.startswith("f") or funclower.startswith("mea")
56 or funclower.startswith("med") or funclower.startswith("s")
57 ):
58 raise Exception("Unsupported function " + function)
59 if bool(major) != bool(minor):
60 raise Exception("You must specify both of major and minor, or neither of them")
61 myia.open(imagename)
62 bunit = myia.brightnessunit()
63 unit_is_perbeam = bunit.find("/beam") >= 0
64 if not bunit or not (unit_is_perbeam or bunit.endswith("K")):
65 _no_unit_no_beam_message()
66 # we must be able to compute the flux density, this is part of
67 # the requirements. See eg CAS-10791
68 if (unit_is_perbeam and not bool(major) and not bool(myia.restoringbeam())):
69 _no_unit_no_beam_message()
70 try:
71 axis = myia.coordsys().axiscoordinatetypes().index("Spectral")
72 except Exception:
73 raise RuntimeError("Image does not have a spectral coordinate, cannot proceed")
74 if myia.shape()[axis] == 1:
75 raise Exception("This application only supports multi-channel images")
76 csys = myia.coordsys()
77 reg = myrg.frombcs(
78 csys=csys.torecord(), shape=myia.shape(), box=box,
79 chans=chans, stokes=stokes, stokescontrol="a", region=region
80 )
81 if bool(major):
82 if (unit_is_perbeam):
83 myia = myia.subimage()
84 myia.setrestoringbeam(major=major, minor=minor, pa="0deg")
85 else:
86 casalog.post(
87 "Image brightness unit is " + bunit
88 + ". Ignorming major and minor specificaitons.",
89 "WARN"
90 )
91 rec = myia.getprofile(
92 axis=axis, function="flux", region=reg,
93 mask=mask, unit=unit, stretch=stretch
94 )
95 xunit = rec['xUnit']
96 wreg = "region=" + str(region)
97 if box or chans or stokes:
98 wreg = "box=" + box + ", chans=" + chans + ", stokes=" + stokes
99 header = "# " + imagename + ", " + wreg + "\n"
100 beamrec = myia.restoringbeam()
101 if beamrec:
102 if 'major' in beamrec:
103 beamsize = myia.beamarea()
104 header += "# beam size: " + str(beamsize['arcsec2'])
105 header += " arcsec2, " + str(beamsize["pixels"]) + " pixels\n"
106 else:
107 header += "# multiple beams\n"
108 else:
109 header += "# no beam\n"
110 coords = rec['coords']
111 shifted = list(coords)
112 shifted.pop(0)
113 shifted = numpy.array(shifted)
114 increments = shifted - coords[:-1]
115 increments = numpy.append(increments, increments[-1])
116 increments = numpy.abs(increments)
117 fd = rec['values']
118 vals = fd
119 flux = numpy.sum(fd*increments)
120 # header += "# Total flux: " + str(f'{flux:.12g}') + " " + rec['yUnit'] + "." + xunit + "\n"
121 header += "# Total flux: " + '%.12g' % flux + " " + rec['yUnit'] + "." + xunit + "\n"
122 # now compute the requested function
123 real_func = ""
124 agg_title = "Flux_density"
125 yUnit = rec['yUnit']
126 if funclower.startswith("mea"):
127 real_func = "mean"
128 agg_title = "Mean"
129 elif funclower.startswith("med"):
130 real_func = "median"
131 agg_title = "Median"
132 elif funclower.startswith("s"):
133 real_func = "sum"
134 agg_title = "Sum"
135 if len(real_func) > 0:
136 zz = myia.getprofile(
137 axis=axis, function=real_func, region=reg,
138 mask=mask, unit=unit, stretch=stretch
139 )
140 vals = zz['values']
141 yUnit = zz['yUnit']
142 need_freq = True
143 need_vel = True
144 myq = _qa.quantity("1" + xunit)
145 if _qa.convert(myq, "km/s")['unit'] == "km/s":
146 need_vel = False
147 vels = rec['coords']
148 vel_unit = xunit
149 elif _qa.convert(myq, "MHz")['unit'] == "MHz":
150 need_freq = False
151 freqs = rec['coords']
152 freq_unit = xunit
153 if need_vel:
154 vels = myia.getprofile(
155 axis=axis, function="flux", region=reg,
156 mask=mask, unit="km/s", stretch=stretch
157 )['coords']
158 vel_unit = "km/s"
159 if need_freq:
160 freqs = myia.getprofile(
161 axis=axis, function="flux", region=reg,
162 mask=mask, unit="MHz", stretch=stretch
163 )['coords']
164 freq_unit = "MHz"
165 freq_col = "frequency_(" + freq_unit + ")"
166 freq_width = len(freq_col)
167 freq_spec = "%" + str(freq_width) + ".6f"
168 vel_col = "Velocity_(" + vel_unit + ")"
169 vel_width = len(vel_col)
170 vel_spec = "%" + str(vel_width) + ".6f"
171 flux_col = agg_title + "_(" + yUnit + ")"
172 flux_width = max(len(flux_col), 12)
173 flux_spec = "%" + str(flux_width) + ".6e"
174 header += "# Channel number_of_unmasked_pixels " + freq_col
175 header += " " + vel_col
176 header += " " + flux_col + "\n"
177 planes = rec['planes']
178 npix = rec['npix']
180 for i in range(len(rec['values'])):
181 header += "%9d %25d " % (planes[i], npix[i])
182 header += freq_spec % (freqs[i]) + " "
183 header += vel_spec % (vels[i]) + " "
184 header += flux_spec % (vals[i])
185 header += "\n"
186 casalog.post(header, "NORMAL")
187 if (logfile):
188 with open(logfile, "w") as myfile:
189 myfile.write(header)
190 # tasks no longer return bool
191 finally:
192 myia.done()
193 myrg.done()
194 _qa.done()
196def _no_unit_no_beam_message():
197 # CAS-10791
198 raise RuntimeError(
199 "This application is required to do a flux density calculation but cannot "
200 + "because the image has no beam and/or appropriate brightness unit. Please "
201 + "define a beam using the relevant task parameter inputs. To add a beam "
202 + "and brightness unit to your image, use ia.setrestoringbeam() and "
203 + "ia.setbrightnessunit(). To simply return a one-dimensional profile along "
204 + "a specified axis, ia.getprofile() is also available."
205 )