Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/sdbeamutil.py: 12%
443 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 07:43 +0000
2import numpy as np
3from numpy import cos, sin, sqrt
4import scipy.interpolate
5import scipy.optimize as optimize
6import scipy.signal
7import scipy.special as spspec
9from casatasks import casalog
10from casatools import quanta
13##################################################
14# Prediction of theoretical beam size
15##################################################
16class TheoreticalBeam:
17 """
18 The class to derive the theoretical beam size of an image.
20 Example:
21 bu = sdbeamutil.TheoreticalBeam()
22 # set imaging, antenna, and pointing informations
23 bu.set_antenna('12m',blockage='0.75m',taper=10)
24 bu.set_sampling(['12arcsec','12arcsec'])
25 bu.set_image_param('5arcsec', '115GHz','SF', 6, -1, -1, -1)
26 # print summary of setup to logger
27 bu.summary()
28 # get theoretical beam size of an image.
29 beam = bu.get_beamsize_image()
30 """
32 def __init__(self):
33 self.is_antenna_set = False
34 self.is_kernel_set = False
35 self.is_sampling_set = False
36 self.antenna_diam_m = -1.
37 self.antenna_block_m = 0.0
38 self.taper = 10
39 self.ref_freq = -1.
40 self.kernel_type = ""
41 self.kernel_param = {}
42 self.pa = "0.0deg"
43 self.sampling_arcsec = []
44 self.cell_arcsec = []
46 def __to_arcsec_list(self, angle):
47 """Return a list of angles in arcsec (value only without unit)."""
48 if type(angle) not in [list, tuple, np.ndarray]:
49 angle = [angle]
50 return [self.__to_arcsec(val) for val in angle]
52 def __to_arcsec(self, angle):
53 """Convert angle to arcsec and return the value without unit."""
54 my_qa = quanta()
55 if my_qa.isangle(angle):
56 return my_qa.getvalue(my_qa.convert(angle, "arcsec"))[0]
57 elif my_qa.getunit(angle) == '':
58 return float(angle)
59 else:
60 raise ValueError("Invalid angle: %s" % (str(angle)))
62 def __parse_width(self, val, cell_size_arcsec):
63 """Convert value in unit of arcsec.
65 If val is angle, returns a float value in unit of arcsec.
66 else the unit is assumed to be pixel and multiplied by cell_size_arcsec
67 """
68 my_qa = quanta()
69 if my_qa.isangle(val):
70 return self.__to_arcsec(val)
71 elif my_qa.getunit(val) in ('', 'pixel'):
72 return my_qa.getvalue(val)[0] * cell_size_arcsec
73 else:
74 raise ValueError("Invalid width %s" % str(val))
76 def set_antenna(self, diam, blockage="0.0m", taper=10):
77 """Set parameters to construct antenna beam.
79 antenna diameter and blockage
80 taper: the illumination taper in dB
81 """
82 # try quantity
83 my_qa = quanta()
84 self.antenna_diam_m = my_qa.getvalue(my_qa.convert(diam, "m"))[0]
85 self.antenna_block_m = my_qa.getvalue(my_qa.convert(blockage, "m"))[0]
86 self.taper = taper
87 self.is_antenna_set = True
89 def set_sampling(self, intervals, pa="0deg"):
90 """Set sampling interval of observation.
92 intervals: pointing inteval of observation (['10arcsec','10arcsec'])
93 pa: position angle (NOT USED)
94 """
95 self.pa = pa
96 self.sampling_arcsec = [abs(a) for a in
97 self.__to_arcsec_list(intervals)]
98 self.is_sampling_set = True
100 def set_image_param(self, cell, ref_freq, gridfunction,
101 convsupport, truncate, gwidth, jwidth, is_alma=False):
102 """Set imaging parameters.
104 cell: image pixel size
105 ref_freq: reference frequency of image to calculate beam size
106 gridfunction, convsupport, truncate, gwidth, jwidth:
107 parameters passed to imager
108 is_alma: valid only for PB kernel to use 10.7m
109 """
110 self.ref_freq = ref_freq
111 self.cell_arcsec = [abs(a) for a in
112 self.__to_arcsec_list(cell)]
113 if gridfunction.upper() == "SF":
114 self.__set_sf_kernel(convsupport)
115 elif gridfunction.upper() == "GJINC":
116 self.__set_gjinc_kernel(truncate, gwidth, jwidth)
117 elif gridfunction.upper() == "GAUSS":
118 self.__set_gauss_kernel(truncate, gwidth)
119 elif gridfunction.upper() == "BOX":
120 self.__set_box_kernel(self.cell_arcsec[0])
121 elif gridfunction.upper() == "PB":
122 self.__set_pb_kernel(is_alma)
123 self.is_kernel_set = True
125 def __set_sf_kernel(self, convsupport):
126 """Set SF kernel parameter to the class."""
127 self.kernel_type = "SF"
128 self.kernel_param = dict(convsupport=convsupport)
130 def __set_gjinc_kernel(self, truncate, gwidth, jwidth):
131 """Set GJINC kernel parameter to the class."""
132 self.kernel_type = "GJINC"
133 self.kernel_param = dict(truncate=truncate, gwidth=gwidth, jwidth=jwidth)
135 def __set_gauss_kernel(self, truncate, gwidth):
136 """Set GAUSS kernel parameter to the class."""
137 self.kernel_type = "GAUSS"
138 self.kernel_param = dict(truncate=truncate, gwidth=gwidth)
140 def __set_box_kernel(self, width):
141 """Set BOX kernel parameter to the class."""
142 self.kernel_type = "BOX"
143 self.kernel_param = dict(width=width)
145 def __set_pb_kernel(self, alma=False):
146 """Set PB kernel parameter to the class."""
147 self.kernel_type = "PB"
148 self.kernel_param = dict(alma=alma)
150 def get_beamsize_image(self):
151 """Return FWHM of theoretical beam size in image.
153 The FWHM is derived by fitting gridded beam with Gaussian function.
154 """
155 # assert necessary information is set
156 self.__assert_antenna()
157 self.__assert_kernel()
158 self.__assert_sampling()
159 casalog.post("Calculating theoretical beam size of the image")
160 # construct theoretic beam for image
161 axis, beam = self.get_antenna_beam()
162 casalog.post(
163 f"Length of convolution array={len(axis)}, "
164 f"total width={axis[-1] - axis[0]} arcsec, "
165 f"separation={axis[1] - axis[0]} arcsec",
166 priority="DEBUG1")
167 kernel = self.get_kernel(axis)
168 sampling = self.get_box_kernel(axis, self.sampling_arcsec[0])
169 # convolution
170 gridded = np.convolve(beam, kernel, mode='same')
171 gridded /= max(gridded)
172 result = np.convolve(gridded, sampling, mode='same')
173 result /= max(result)
174 # fwhm_arcsec = findFWHM(axis,result)
175 fwhm_arcsec, dummy = self.gaussfit(axis, result, minlevel=0.0, truncate=False)
176 casalog.post("- initial FWHM of beam = %f arcsec" %
177 findFWHM(axis, beam))
178 casalog.post("- FWHM of gridding kernel = %f arcsec" %
179 findFWHM(axis, kernel))
180 casalog.post("- FWHM of theoretical beam = %f arcsec" %
181 findFWHM(axis, result))
182 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" %
183 fwhm_arcsec)
184 del result
185 if len(self.sampling_arcsec) == 1 and \
186 (len(self.cell_arcsec) == 1 or self.kernel_type != "BOX"):
187 fwhm_geo_arcsec = fwhm_arcsec
188 else:
189 if len(self.sampling_arcsec) > 1:
190 sampling = self.get_box_kernel(axis, self.sampling_arcsec[1])
191 elif self.kernel_type == "BOX" and len(self.cell_arcsec) > 1:
192 kernel = self.get_box_kernel(axis, self.cell_arcsec[1])
193 gridded = np.convolve(beam, kernel, mode='same')
194 gridded /= max(gridded)
195 result = np.convolve(gridded, sampling, mode='same')
196 result /= max(result)
197 # fwhm1 = findFWHM(axis,result)
199 # Gaussian fit to the result convolved with additional kernel
200 # to take into account raster pointing pattern.
201 # CAS-14608
202 # Note that the fitting could fail especially when the
203 # pointing pattern is peculiar due to hardware/software problem
204 # related to the antenna. In that case, the FWHM estimate will
205 # fall back to theoretical beam size.
206 try:
207 fwhm1, _ = self.gaussfit(axis, result, minlevel=0.0, truncate=False)
208 fwhm_geo_arcsec = np.sqrt(fwhm_arcsec * fwhm1)
209 casalog.post("The second axis")
210 casalog.post("- FWHM of gridding kernel = %f arcsec" %
211 findFWHM(axis, kernel))
212 casalog.post("- FWHM of theoretical beam = %f arcsec" %
213 findFWHM(axis, result))
214 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" %
215 fwhm1)
216 except Exception as e:
217 casalog.post(
218 "Gaussian fit to get effective beam size failed"
219 f" with the following error: {e} \n"
220 "Falling back to the theoretical beam size"
221 " derived from the antenna beam pattern.",
222 priority="WARN"
223 )
224 fwhm_geo_arcsec = fwhm_arcsec
225 finally:
226 del result
227 # clear-up axes
228 del axis, beam, kernel, sampling, gridded
230 return dict(major="%farcsec" % (fwhm_geo_arcsec),
231 minor="%farcsec" % (fwhm_geo_arcsec),
232 pa=self.pa)
234 def __assert_antenna(self):
235 """Raise an error if antenna information is not set."""
236 if not self.is_antenna_set:
237 raise RuntimeError("Antenna is not set")
239 def __assert_kernel(self):
240 """Raise an error if imaging parameters are not set."""
241 if not self.is_kernel_set:
242 raise RuntimeError("Kernel is not set.")
244 def __assert_sampling(self):
245 """Raise an error if sampling information is not set."""
246 if not self.is_sampling_set:
247 raise RuntimeError("Sampling information is not set")
249 def get_antenna_beam(self):
250 """Return arrays of antenna beam response and it's horizontal axis.
252 Picked from AnalysisUtils (revision 1.2204, 2015/02/18)
253 """
254 self.__assert_antenna()
255 self.__assert_kernel()
257 fwhm_arcsec = primaryBeamArcsec(frequency=self.ref_freq,
258 diameter=self.antenna_diam_m,
259 taper=self.taper, showEquation=True,
260 obscuration=self.antenna_block_m,
261 fwhmfactor=None)
262 truncate = False
263 convolutionPixelSize = 0.02 # arcsec
264 # avoid too coarse convolution array w.r.t. sampling
265 if self.is_sampling_set and \
266 min(self.sampling_arcsec) < 5 * convolutionPixelSize:
267 convolutionPixelSize = min(self.sampling_arcsec) / 5.0
268 # avoid too fine convolution arrays.
269 sizes = list(self.cell_arcsec) + [fwhm_arcsec]
270 if self.is_sampling_set:
271 sizes += list(self.sampling_arcsec)
272 minsize = min(sizes)
273 support_min = 1000.
274 support_fwhm = 5000.
275 if minsize > support_min * convolutionPixelSize:
276 convolutionPixelSize = minsize / support_min
277 if fwhm_arcsec > support_fwhm * convolutionPixelSize:
278 convolutionPixelSize = min(fwhm_arcsec / support_fwhm, minsize / 10.)
280 if (self.taper < 0.1):
281 # Airly disk
282 return self.buildAiryDisk(fwhm_arcsec, 3., convolutionPixelSize,
283 truncate=truncate,
284 obscuration=self.antenna_block_m,
285 diameter=self.antenna_diam_m)
286 else:
287 # Gaussian beam
288 myxaxis = np.arange(-3 * fwhm_arcsec,
289 3 * fwhm_arcsec + 0.5 * convolutionPixelSize,
290 convolutionPixelSize)
291 myfunction = self.gauss(myxaxis, [fwhm_arcsec, truncate])
292 return myxaxis, myfunction
294 def get_kernel(self, axis):
295 """Return imaging kernel array."""
296 self.__assert_kernel()
297 if self.kernel_type == "SF":
298 # TODO: what to do for cell[0]!=cell[1]???
299 return self.get_sf_kernel(axis, self.kernel_param['convsupport'],
300 self.cell_arcsec[0])
301 elif self.kernel_type == "GJINC":
302 return self.get_gjinc_kernel(axis, self.kernel_param['truncate'],
303 self.kernel_param['gwidth'],
304 self.kernel_param['jwidth'],
305 self.cell_arcsec[0])
306 elif self.kernel_type == "GAUSS":
307 return self.get_gauss_kernel(axis, self.kernel_param['truncate'],
308 self.kernel_param['gwidth'],
309 self.cell_arcsec[0])
310 elif self.kernel_type == "BOX":
311 return self.get_box_kernel(axis, self.kernel_param['width'])
312 elif self.kernel_type == "PB":
313 diam = self.antenna_diam_m
314 if self.kernel_param['alma']:
315 diam = 10.7
316 casalog.post(
317 f"Using effective antenna diameter {diam}m for "
318 f"{self.kernel_type} kernel of ALMA antennas")
319 epsilon = self.antenna_block_m / diam
320 return self.get_pb_kernel(axis, diam, self.ref_freq, epsilon=epsilon)
321 # return (self.rootAiryIntensity(axis, epsilon))**2
322 else:
323 raise RuntimeError("Invalid kernel: %s" % self.kernel_type)
325 def summary(self):
326 """Print summary of parameters set to the class."""
327 casalog.post("=" * 40)
328 casalog.post("Summary of Image Beam Parameters")
329 casalog.post("=" * 40)
330 casalog.post("[Antenna]")
331 self.__antenna_summary()
333 casalog.post("\n[Imaging Parameters]")
334 self.__kernel_summary()
336 casalog.post("\n[Sampling]")
337 self.__sampling_summary()
339 def __notset_message(self):
340 casalog.post("Not set.")
342 def __antenna_summary(self):
343 """Print summary of antenna setup."""
344 if not self.is_antenna_set:
345 self.__notset_message()
346 return
347 casalog.post("diameter: %f m" % (self.antenna_diam_m))
348 casalog.post("blockage: %f m" % (self.antenna_block_m))
350 def __kernel_summary(self):
351 """Print summary of imaging parameter setup."""
352 if not self.is_kernel_set:
353 self.__notset_message()
354 return
355 casalog.post("reference frequency: %s" % str(self.ref_freq))
356 casalog.post("cell size: %s arcsec" % str(self.cell_arcsec))
357 casalog.post("kernel type: %s" % self.kernel_type)
358 for key, val in self.kernel_param.items():
359 casalog.post("%s: %s" % (key, str(val)))
361 def __sampling_summary(self):
362 """Print summary of sampling setup."""
363 if not self.is_sampling_set:
364 self.__notset_message()
365 return
366 casalog.post("sampling interval: %s arcsec" % str(self.sampling_arcsec))
367 casalog.post("position angle: %s" % (self.pa))
369 #
370 # Construct Kernel arrays
371 #
373 #
374 # BOX
375 #
376 def get_box_kernel(self, axis, width):
377 """Return a box kernel array with specified width.
379 axis: an array of xaxis values
380 width: kernel width
381 output array
382 out[i] = 1.0 (-width/2.0 <= x[i] <= width/2.0)
383 = 0.0 (else)
384 """
385 data = np.zeros(len(axis))
386 indices = np.where(abs(axis) <= width / 2.0)
387 data[indices] = 1.0
388 return data
390 #
391 # SF
392 #
393 def get_sf_kernel(self, axis, convsupport, cell_size):
394 """Return spheroidal kernel array.
396 axis: an array of xaxis values
397 convsupport: the truncation radius of SF kernel in pixel unit.
398 cell_size: image pixel size
400 Modified version of one in AnalysisUtils.sfBeamPredict (revision 1.2204, 2015/02/18)
401 """
402 convsupport = 3 if convsupport == -1 else convsupport
403 supportwidth = (convsupport * 1.0 + 0.0)
404 # value obtained by matching Fred's grdsf.f output with scipy(m=0,n=0)
405 c = 5.356 * np.pi / 2.0
406 sfaxis = axis / float(supportwidth * cell_size * 1.0)
407 indices = np.where(abs(sfaxis) < 1)[0]
408 centralRegion = sfaxis[indices]
409 centralRegionY = self.spheroidalWaveFunction(centralRegion, 0, 0, c, 1)
410 mysf = np.zeros(len(axis))
411 mysf[indices] += centralRegionY / max(centralRegionY)
412 return mysf
414 def spheroidalWaveFunction(self, x, m=0, n=0, c=0, alpha=0):
415 """Return spheroidal wave function.
417 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
418 """
419 if (type(x) != list and type(x) != np.ndarray):
420 returnScalar = True
421 x = [x]
422 else:
423 returnScalar = False
424 cv = scipy.special.pro_cv(m, n, c) # get the eigenvalue
425 result = scipy.special.pro_ang1_cv(m, n, c, cv, x)[0]
426 for i in range(len(x)):
427 nu = x[i] # (i-0.5*len(x))/(0.5*len(x)) # only true if x is symmetric about zero
428 result[i] *= (1 - nu**2)**alpha
429 # The peak of this function is about 10000 for m=0,n=0,c=6
430 if (returnScalar):
431 return result[0]
432 else:
433 return result
435 #
436 # GAUSS
437 #
438 def get_gauss_kernel(self, axis, truncate, gwidth, cell_arcsec):
439 """Return a gaussian kernel array.
441 axis : an array of xaxis values
442 truncate : truncation radius
443 NOTE definition is different from truncate in gauss()!
444 gwidth : kernel gwidth
445 cell_arcsec : image pixel size in unit of arcsec
446 """
447 if gwidth == -1:
448 gwidth = np.sqrt(np.log(2.0))
449 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec)
450 # get gauss for full axis
451 result = self.gauss(axis, [gwidth_arcsec])
452 # truncate kernel outside the truncation radius
453 if truncate == -1:
454 trunc_arcsec = gwidth_arcsec * 1.5
455 elif truncate is not None:
456 trunc_arcsec = self.__parse_width(truncate, cell_arcsec)
457 idx = np.where(abs(axis) > trunc_arcsec)
458 result[idx] = 0.
459 return result
461 def gauss(self, x, parameters):
462 """Compute the value of the Gaussian function.
464 Compute the value of the Gaussian function at the specified
465 location(s) with respect to the peak (which is assumed to be at x=0).
466 truncate: if not None, then set result to zero if below this value.
467 -Todd Hunter
468 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
469 """
470 if (type(parameters) != np.ndarray and type(parameters) != list):
471 parameters = np.array([parameters])
472 if (len(parameters) < 2):
473 parameters = np.array([parameters[0], 0])
474 fwhm = parameters[0]
475 x = np.asarray(x, dtype=np.float64)
476 sigma = fwhm / 2.3548201
477 result = np.exp(-(x**2 / (2.0 * sigma**2)))
478 idx = np.where(result < parameters[1])[0]
479 result[idx] = 0
480 return result
482 #
483 # GJINC
484 #
485 def get_gjinc_kernel(self, axis, truncate, gwidth, jwidth, cell_arcsec):
486 """Return a GJinc kernel array.
488 axis : an array of xaxis values
489 truncate : truncation radius (NOT SUPPORTED YET)
490 gwidth jwidth : kernel gwidth
491 cell_arcsec : image pixel size in unit of arcsec
492 """
493 if gwidth == -1:
494 gwidth = 2.52 * np.sqrt(np.log(2.0))
495 if jwidth == -1:
496 jwidth = 1.55
497 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec)
498 jwidth_arcsec = self.__parse_width(jwidth, cell_arcsec)
499 mygjinc = self.trunc(self.gjinc(axis, gwidth=gwidth_arcsec,
500 jwidth=jwidth_arcsec,
501 useCasaJinc=True, normalize=False))
502 return mygjinc
504 def gjinc(self, x, gwidth, jwidth, useCasaJinc=False, normalize=False):
505 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
506 if (useCasaJinc):
507 result = self.grdjinc1(x, jwidth, normalize) * self.gjincGauss(x, gwidth)
508 else:
509 result = self.jinc(x, jwidth) * self.gjincGauss(x, gwidth)
510 return result
512 def grdjinc1(self, val, c, normalize=True):
513 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
514 # Casa's function
515 # // Calculate J_1(x) using approximate formula
516 xs = np.pi * val / c
517 result = []
518 for x in xs:
519 x = abs(x) # I added this to make it symmetric
520 ax = abs(x)
521 if (ax < 8.0):
522 y = x * x
523 ans1 = x * (72362614232.0 + y *
524 (-7895059235.0 + y * (242396853.1 + y *
525 (-2972611.439 + y * (15704.48260 + y * (-30.16036606))))))
526 ans2 = 144725228442.0 + y * (2300535178.0 + y *
527 (18583304.74 + y * (99447.43394 + y *
528 (376.9991397 + y * 1.0))))
529 ans = ans1 / ans2
530 else:
531 z = 8.0 / ax
532 y = z * z
533 xx = ax - 2.356194491
534 ans1 = 1.0 + y * (0.183105e-2 + y *
535 (-0.3516396496e-4 + y *
536 (0.2457520174e-5 + y * (-0.240337019e-6))))
537 ans2 = 0.04687499995 + y * (-0.2002690873e-3 + y *
538 (0.8449199096e-5 + y *
539 (-0.88228987e-6 + y * (0.105787412e-6))))
540 ans = sqrt(0.636619772 / ax) * (cos(xx) * ans1 - z * sin(xx) * ans2)
541 if (x < 0.0):
542 ans = -ans
543 if (x == 0.0):
544 out = 0.5
545 else:
546 out = ans / x
547 if (normalize):
548 out = out / 0.5
549 result.append(out)
550 return(result)
552 def jinc(self, x, jwidth):
553 """Compute JINC value.
555 The peak of this function is 0.5.
556 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
557 """
558 argument = np.pi * np.abs(x) / jwidth
559 np.seterr(invalid='ignore') # prevent warning for central point
560 result = scipy.special.j1(argument) / argument
561 np.seterr(invalid='warn')
562 for i in range(len(x)):
563 if (abs(x[i]) < 1e-8):
564 result[i] = 0.5
565 return result
567 def gjincGauss(self, x, gwidth):
568 return (np.exp(-np.log(2) * (x / float(gwidth))**2))
570 #
571 # Airly disk
572 #
573 def get_pb_kernel(self, axis, diam, ref_freq, epsilon=0.0):
574 """Return Airy Disk array.
576 Return Airy Disk array defined by the axis, diameter, reference frequency
577 and ratio of central hole and antenna diameter
579 axis: x-axis values
580 diameter: antenna diameter in unit of m
581 reference frequency: the reference frequency of the image
582 epsilon: ratio of central hole diameter to antenna diameter
583 """
584 a = (self.rootAiryIntensity(axis, epsilon))**2
585 airyfwhm = findFWHM(axis, a)
586 fwhm = primaryBeamArcsec(frequency=ref_freq, diameter=diam,
587 taper=self.taper, showEquation=False,
588 obscuration=diam * epsilon,
589 fwhmfactor=None)
590 ratio = fwhm / airyfwhm
591 tempaxis = axis / ratio
592 a = self.rootAiryIntensity(tempaxis, epsilon)
593 return a**2
595 def buildAiryDisk(self, fwhm, xaxisLimitInUnitsOfFwhm, convolutionPixelSize,
596 truncate=False, obscuration=0.75, diameter=12.0):
597 """Build airy disk.
599 This function computes the Airy disk (with peak of 1.0) across a grid of points
600 specified in units of the FWHM of the disk.
601 fwhm: a value in arcsec
602 xaxisLimitInUnitsOfFwhm: an integer or floating point unitless value
603 truncate: if True, truncate the function at the first null (on both sides)
604 obscuration: central hole diameter (meters)
605 diameter: dish diameter (meters)
606 obscuration and diameter are used to compute the blockage ratio (epsilon)
607 and its effect on the pattern
608 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
609 """
610 epsilon = obscuration / diameter
611 # casalog.post("Using epsilon = %f" % (epsilon))
612 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm,
613 xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize,
614 convolutionPixelSize)
615 a = (self.rootAiryIntensity(myxaxis, epsilon))**2
616 # Scale the Airy disk to the desired FWHM, and recompute on finer grid
617 airyfwhm = findFWHM(myxaxis, a)
618 ratio = fwhm / airyfwhm
619 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm / ratio,
620 (xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize) / ratio,
621 convolutionPixelSize / ratio)
622 a = self.rootAiryIntensity(myxaxis, epsilon)
623 if (truncate):
624 a = self.trunc(a)
625 a = a**2
626 myxaxis *= ratio
627 return(myxaxis, a)
629 def rootAiryIntensity(self, myxaxis, epsilon=0.0, showplot=False):
630 """Compute the value of 2*J1(x)/x.
632 This function computes 2*J1(x)/x, which can be squared to get an Airy disk.
633 myxaxis: the x-axis values to use
634 epsilon: radius of central hole in units of the dish diameter
635 Migrated from AnalysisUtils.py (revision 1.2204, 2015/02/18)
636 """
637 if (epsilon > 0):
638 a = (2 * spspec.j1(myxaxis) / myxaxis -
639 epsilon**2 * 2 * spspec.j1(myxaxis * epsilon) /
640 (epsilon * myxaxis)) / (1 - epsilon**2)
641 else:
642 a = 2 * spspec.j1(myxaxis) / myxaxis # simpler formula for epsilon=0
643 return(a)
645 def trunc(self, result):
646 """Truncate a list at the first null.
648 Truncates a list at the first null on both sides of the center,
649 starting at the center and moving outward in each direction.
650 Assumes the list is positive in the center, e.g. a Gaussian beam.
651 -Todd Hunter
652 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
653 """
654 # casa default truncate=-1, which means truncate at radius of first null
655 mask = np.zeros(len(result))
656 truncateBefore = 0
657 truncateBeyond = len(mask)
658 for r in range(len(result) // 2, len(result)):
659 if (result[r] < 0):
660 truncateBeyond = r
661 break
662 for r in range(len(result) // 2, 0, -1):
663 if (result[r] < 0):
664 truncateBefore = r
665 break
666 mask[truncateBefore:truncateBeyond] = 1
667 # casalog.post(
668 # "Truncating outside of pixels %d-%d (len=%d)" %
669 # (truncateBefore,truncateBeyond-1,len(mask)))
670 result *= mask
671 return result
673 def gaussfit_errfunc(self, parameters, x, y):
674 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
675 return (y - self.gauss(x, parameters))
677 def gaussfit(self, x, y, showplot=False, minlevel=0, verbose=False,
678 title=None, truncate=False):
679 """Perform 1D Gaussian fit.
681 Fits a 1D Gaussian assumed to be centered at x=0 with amp=1 to the
682 specified data, with an option to truncate it at some level.
683 Returns the FWHM and truncation point.
684 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
685 """
686 fwhm_guess = findFWHM(x, y)
687 if (truncate == False):
688 parameters = np.asarray([fwhm_guess], dtype=np.float64)
689 else:
690 parameters = np.asarray([fwhm_guess, truncate], dtype=np.float64)
691 if (verbose):
692 casalog.post("Fitting for %d parameters: guesses = %s" % (len(parameters), parameters))
693 xx = np.asarray(x, dtype=np.float64)
694 yy = np.asarray(y, dtype=np.float64)
695 lenx = len(x)
696 if (minlevel > 0):
697 xwidth = findFWHM(x, y, minlevel)
698 xx = x[np.where(np.abs(x) < xwidth * 0.5)[0]]
699 yy = y[np.where(np.abs(x) < xwidth * 0.5)[0]]
700 if (verbose):
701 casalog.postt("Keeping %d/%d points, guess = %f arcsec" %
702 (len(x), lenx, fwhm_guess))
703 result = optimize.leastsq(self.gaussfit_errfunc, parameters, args=(xx, yy),
704 full_output=1)
705 bestParameters = result[0]
706 infodict = result[2]
707 numberFunctionCalls = infodict['nfev']
708 mesg = result[3]
709 ier = result[4]
710 if (verbose):
711 casalog.post("optimize.leastsq: ier=%d, #calls=%d, message = %s" %
712 (ier, numberFunctionCalls, mesg))
713 if (type(bestParameters) == list or type(bestParameters) == np.ndarray):
714 fwhm = bestParameters[0]
715 if verbose:
716 casalog.post("fitted FWHM = %f" % (fwhm))
717 if (truncate != False):
718 truncate = bestParameters[1]
719 casalog.post("optimized truncation = %f" % (truncate))
720 else:
721 fwhm = bestParameters
722 return(fwhm, truncate)
725def findFWHM(x, y, level=0.5, s=0):
726 """Measure the FWHM of the specified profile.
728 This works well in a noise-free environment.
729 The data are assumed to be sorted by the x variable.
730 x: the position variable
731 y: the intensity variable
732 level: the signal level for which to find the full width
733 s: see help scipy.interpolate.UnivariateSpline
734 -Todd Hunter
735 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
736 """
737 halfmax = np.max(y) * level
738 spline = scipy.interpolate.UnivariateSpline(x, y - halfmax, s=s)
739 result = spline.roots()
740 if (len(result) == 2):
741 x0, x1 = result
742 return(abs(x1 - x0))
743 elif (len(result) == 1):
744 return(2 * abs(result[0]))
745 else:
746 # modified (KS 2015/02/19)
747 # casalog.post(
748 # f"More than two crossings ({len(result)}), "
749 # "fitting slope to points near that power level."
750 # )
751 # result = 2*findZeroCrossingBySlope(x, y-halfmax)
752 # return(result)
753 errmsg = "Unsupported FWHM search in CASA. " + \
754 f"More than two crossings ({len(result)}) at level {halfmax} ({level} % of peak)."
755 raise Exception(errmsg)
758def primaryBeamArcsec(frequency, diameter, obscuration, taper,
759 showEquation=True, use2007formula=True, fwhmfactor=None):
760 """Implement the Baars formula: b*lambda / D.
762 if use2007formula==False, use the formula from ALMA Memo 456
763 if use2007formula==True, use the formula from Baars 2007 book
764 (see au.baarsTaperFactor)
765 In either case, the taper value is expected to be entered as positive.
766 Note: if a negative value is entered, it is converted to positive.
767 The effect of the central obstruction on the pattern is also accounted for
768 by using a spline fit to Table 10.1 of Schroeder's Astronomical Optics.
769 fwhmfactor: if given, then ignore the taper
770 For further help and examples, see https://safe.nrao.edu/wiki/bin/view/ALMA/PrimaryBeamArcsec
771 -- Todd Hunter
772 Simplified version of the one in AnalysisUtils (revision 1.2204, 2015/02/18)
773 """
774 if (fwhmfactor is not None):
775 taper = effectiveTaper(fwhmfactor, diameter, obscuration, use2007formula)
776 if (taper is None):
777 return
778 if (taper < 0):
779 taper = abs(taper)
780 if (obscuration > 0.4 * diameter):
781 casalog.post(
782 "This central obscuration is too large for the method of calculation employed here.")
783 return
784 if (type(frequency) == str):
785 my_qa = quanta()
786 frequency = my_qa.getvalue(my_qa.convert(frequency, "Hz"))[0]
787 lambdaMeters = 2.99792458e8 / frequency
788 b = baarsTaperFactor(taper, use2007formula) * centralObstructionFactor(diameter, obscuration)
789 if (showEquation):
790 if (use2007formula):
791 formula = "Baars (2007) Eq 4.13"
792 else:
793 formula = "ALMA memo 456 Eq. 18"
794 casalog.post(
795 f"Coefficient from {formula} for a -{taper:.1f}dB edge taper "
796 f"and obscuration ratio={obscuration:g}/{diameter:g} = {b:.3f}*lambda/D")
797 return(b * lambdaMeters * 3600 * 180 / (diameter * np.pi))
800def effectiveTaper(fwhmFactor=1.16, diameter=12, obscuration=0.75,
801 use2007formula=True):
802 """Compute effective taper from constant factor for illumination pattern.
804 The inverse of (Baars formula multiplied by the central
805 obstruction factor). Converts an observed value of the constant X in
806 the formula FWHM=X*lambda/D into a taper in dB (positive value).
807 if use2007formula == False, use Equation 18 from ALMA Memo 456
808 if use2007formula == True, use Equation 4.13 from Baars 2007 book
809 -- Todd Hunter
810 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
811 """
812 cOF = centralObstructionFactor(diameter, obscuration)
813 if (fwhmFactor < 1.02 or fwhmFactor > 1.22):
814 casalog.post("Invalid fwhmFactor (1.02<fwhmFactor<1.22)")
815 return
816 if (baarsTaperFactor(10, use2007formula) * cOF < fwhmFactor):
817 increment = 0.01
818 for taper_dB in np.arange(10, 10 + increment * 1000, increment):
819 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor > 0):
820 break
821 else:
822 increment = -0.01
823 for taper_dB in np.arange(10, 10 + increment * 1000, increment):
824 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor < 0):
825 break
826 return(taper_dB)
829def baarsTaperFactor(taper_dB, use2007formula=True):
830 """Convert a taper to the constant factor for illumination pattern.
832 Converts a taper in dB to the constant X
833 in the formula FWHM=X*lambda/D for the parabolic illumination pattern.
834 We assume that taper_dB comes in as a positive value.
835 use2007formula: False --> use Equation 18 from ALMA Memo 456.
836 True --> use Equation 4.13 from Baars 2007 book
837 - Todd Hunter
838 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
839 """
840 tau = 10**(-0.05 * taper_dB)
841 if (use2007formula):
842 return(1.269 - 0.566 * tau + 0.534 * (tau**2) - 0.208 * (tau**3))
843 else:
844 return(1.243 - 0.343 * tau + 0.12 * (tau**2))
847def centralObstructionFactor(diameter=12.0, obscuration=0.75):
848 """Compute the scale factor of an Airy pattern.
850 Computes the scale factor of an Airy pattern as a function of the
851 central obscuration, using Table 10.1 of Schroeder's 'Astronomical Optics'.
852 -- Todd Hunter
853 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
854 """
855 epsilon = obscuration / diameter
856 myspline = scipy.interpolate.UnivariateSpline(
857 [0, 0.1, 0.2, 0.33, 0.4], [1.22, 1.205, 1.167, 1.098, 1.058], s=0)
858 factor = myspline(epsilon) / 1.22
859 if (type(factor) == np.float64):
860 # casapy 4.2
861 return(factor)
862 else:
863 # casapy 4.1 and earlier
864 return(factor[0])