Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/sdbeamutil.py: 80%
439 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
2import numpy as np
3from numpy import cos, sin, sqrt
4import scipy.interpolate
5import scipy.optimize as optimize
6import scipy.signal
7import scipy.special as spspec
9from casatasks import casalog
10from casatools import quanta
13##################################################
14# Prediction of theoretical beam size
15##################################################
16class TheoreticalBeam:
17 """
18 The class to derive the theoretical beam size of an image.
20 Example:
21 bu = sdbeamutil.TheoreticalBeam()
22 # set imaging, antenna, and pointing informations
23 bu.set_antenna('12m',blockage='0.75m',taper=10)
24 bu.set_sampling(['12arcsec','12arcsec'])
25 bu.set_image_param('5arcsec', '115GHz','SF', 6, -1, -1, -1)
26 # print summary of setup to logger
27 bu.summary()
28 # get theoretical beam size of an image.
29 beam = bu.get_beamsize_image()
30 """
32 def __init__(self):
33 self.is_antenna_set = False
34 self.is_kernel_set = False
35 self.is_sampling_set = False
36 self.antenna_diam_m = -1.
37 self.antenna_block_m = 0.0
38 self.taper = 10
39 self.ref_freq = -1.
40 self.kernel_type = ""
41 self.kernel_param = {}
42 self.pa = "0.0deg"
43 self.sampling_arcsec = []
44 self.cell_arcsec = []
46 def __to_arcsec_list(self, angle):
47 """Return a list of angles in arcsec (value only without unit)."""
48 if type(angle) not in [list, tuple, np.ndarray]:
49 angle = [angle]
50 return [self.__to_arcsec(val) for val in angle]
52 def __to_arcsec(self, angle):
53 """Convert angle to arcsec and return the value without unit."""
54 my_qa = quanta()
55 if my_qa.isangle(angle):
56 return my_qa.getvalue(my_qa.convert(angle, "arcsec"))[0]
57 elif my_qa.getunit(angle) == '':
58 return float(angle)
59 else:
60 raise ValueError("Invalid angle: %s" % (str(angle)))
62 def __parse_width(self, val, cell_size_arcsec):
63 """Convert value in unit of arcsec.
65 If val is angle, returns a float value in unit of arcsec.
66 else the unit is assumed to be pixel and multiplied by cell_size_arcsec
67 """
68 my_qa = quanta()
69 if my_qa.isangle(val):
70 return self.__to_arcsec(val)
71 elif my_qa.getunit(val) in ('', 'pixel'):
72 return my_qa.getvalue(val)[0] * cell_size_arcsec
73 else:
74 raise ValueError("Invalid width %s" % str(val))
76 def set_antenna(self, diam, blockage="0.0m", taper=10):
77 """Set parameters to construct antenna beam.
79 antenna diameter and blockage
80 taper: the illumination taper in dB
81 """
82 # try quantity
83 my_qa = quanta()
84 self.antenna_diam_m = my_qa.getvalue(my_qa.convert(diam, "m"))[0]
85 self.antenna_block_m = my_qa.getvalue(my_qa.convert(blockage, "m"))[0]
86 self.taper = taper
87 self.is_antenna_set = True
89 def set_sampling(self, intervals, pa="0deg"):
90 """Set sampling interval of observation.
92 intervals: pointing inteval of observation (['10arcsec','10arcsec'])
93 pa: position angle (NOT USED)
94 """
95 self.pa = pa
96 self.sampling_arcsec = [abs(a) for a in
97 self.__to_arcsec_list(intervals)]
98 self.is_sampling_set = True
100 def set_image_param(self, cell, ref_freq, gridfunction,
101 convsupport, truncate, gwidth, jwidth, is_alma=False):
102 """Set imaging parameters.
104 cell: image pixel size
105 ref_freq: reference frequency of image to calculate beam size
106 gridfunction, convsupport, truncate, gwidth, jwidth:
107 parameters passed to imager
108 is_alma: valid only for PB kernel to use 10.7m
109 """
110 self.ref_freq = ref_freq
111 self.cell_arcsec = [abs(a) for a in
112 self.__to_arcsec_list(cell)]
113 if gridfunction.upper() == "SF":
114 self.__set_sf_kernel(convsupport)
115 elif gridfunction.upper() == "GJINC":
116 self.__set_gjinc_kernel(truncate, gwidth, jwidth)
117 elif gridfunction.upper() == "GAUSS":
118 self.__set_gauss_kernel(truncate, gwidth)
119 elif gridfunction.upper() == "BOX":
120 self.__set_box_kernel(self.cell_arcsec[0])
121 elif gridfunction.upper() == "PB":
122 self.__set_pb_kernel(is_alma)
123 self.is_kernel_set = True
125 def __set_sf_kernel(self, convsupport):
126 """Set SF kernel parameter to the class."""
127 self.kernel_type = "SF"
128 self.kernel_param = dict(convsupport=convsupport)
130 def __set_gjinc_kernel(self, truncate, gwidth, jwidth):
131 """Set GJINC kernel parameter to the class."""
132 self.kernel_type = "GJINC"
133 self.kernel_param = dict(truncate=truncate, gwidth=gwidth, jwidth=jwidth)
135 def __set_gauss_kernel(self, truncate, gwidth):
136 """Set GAUSS kernel parameter to the class."""
137 self.kernel_type = "GAUSS"
138 self.kernel_param = dict(truncate=truncate, gwidth=gwidth)
140 def __set_box_kernel(self, width):
141 """Set BOX kernel parameter to the class."""
142 self.kernel_type = "BOX"
143 self.kernel_param = dict(width=width)
145 def __set_pb_kernel(self, alma=False):
146 """Set PB kernel parameter to the class."""
147 self.kernel_type = "PB"
148 self.kernel_param = dict(alma=alma)
150 def get_beamsize_image(self):
151 """Return FWHM of theoretical beam size in image.
153 The FWHM is derived by fitting gridded beam with Gaussian function.
154 """
155 # assert necessary information is set
156 self.__assert_antenna()
157 self.__assert_kernel()
158 self.__assert_sampling()
159 casalog.post("Calculating theoretical beam size of the image")
160 # construct theoretic beam for image
161 axis, beam = self.get_antenna_beam()
162 casalog.post(
163 f"Length of convolution array={len(axis)}, "
164 f"total width={axis[-1] - axis[0]} arcsec, "
165 f"separation={axis[1] - axis[0]} arcsec",
166 priority="DEBUG1")
167 kernel = self.get_kernel(axis)
168 sampling = self.get_box_kernel(axis, self.sampling_arcsec[0])
169 # convolution
170 gridded = np.convolve(beam, kernel, mode='same')
171 gridded /= max(gridded)
172 result = np.convolve(gridded, sampling, mode='same')
173 result /= max(result)
174 # fwhm_arcsec = findFWHM(axis,result)
175 fwhm_arcsec, dummy = self.gaussfit(axis, result, minlevel=0.0, truncate=False)
176 # DEBUG MESSAGE
177 casalog.post("- initial FWHM of beam = %f arcsec" %
178 findFWHM(axis, beam))
179 casalog.post("- FWHM of gridding kernel = %f arcsec" %
180 findFWHM(axis, kernel))
181 casalog.post("- FWHM of theoretical beam = %f arcsec" %
182 findFWHM(axis, result))
183 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" %
184 fwhm_arcsec)
185 ###
186 del result
187 if len(self.sampling_arcsec) == 1 and \
188 (len(self.cell_arcsec) == 1 or self.kernel_type != "BOX"):
189 fwhm_geo_arcsec = fwhm_arcsec
190 else:
191 if len(self.sampling_arcsec) > 1:
192 sampling = self.get_box_kernel(axis, self.sampling_arcsec[1])
193 elif self.kernel_type == "BOX" and len(self.cell_arcsec) > 1:
194 kernel = self.get_box_kernel(axis, self.cell_arcsec[1])
195 gridded = np.convolve(beam, kernel, mode='same')
196 gridded /= max(gridded)
197 result = np.convolve(gridded, sampling, mode='same')
198 result /= max(result)
199 # fwhm1 = findFWHM(axis,result)
200 fwhm1, _ = self.gaussfit(axis, result, minlevel=0.0, truncate=False)
201 fwhm_geo_arcsec = np.sqrt(fwhm_arcsec * fwhm1)
202 # DEBUG MESSAGE
203 casalog.post("The second axis")
204 casalog.post("- FWHM of gridding kernel = %f arcsec" %
205 findFWHM(axis, kernel))
206 casalog.post("- FWHM of theoretical beam = %f arcsec" %
207 findFWHM(axis, result))
208 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" %
209 fwhm1)
210 del result
211 # clear-up axes
212 del axis, beam, kernel, sampling, gridded
214 return dict(major="%farcsec" % (fwhm_geo_arcsec),
215 minor="%farcsec" % (fwhm_geo_arcsec),
216 pa=self.pa)
218 def __assert_antenna(self):
219 """Raise an error if antenna information is not set."""
220 if not self.is_antenna_set:
221 raise RuntimeError("Antenna is not set")
223 def __assert_kernel(self):
224 """Raise an error if imaging parameters are not set."""
225 if not self.is_kernel_set:
226 raise RuntimeError("Kernel is not set.")
228 def __assert_sampling(self):
229 """Raise an error if sampling information is not set."""
230 if not self.is_sampling_set:
231 raise RuntimeError("Sampling information is not set")
233 def get_antenna_beam(self):
234 """Return arrays of antenna beam response and it's horizontal axis.
236 Picked from AnalysisUtils (revision 1.2204, 2015/02/18)
237 """
238 self.__assert_antenna()
239 self.__assert_kernel()
241 fwhm_arcsec = primaryBeamArcsec(frequency=self.ref_freq,
242 diameter=self.antenna_diam_m,
243 taper=self.taper, showEquation=True,
244 obscuration=self.antenna_block_m,
245 fwhmfactor=None)
246 truncate = False
247 convolutionPixelSize = 0.02 # arcsec
248 # avoid too coarse convolution array w.r.t. sampling
249 if self.is_sampling_set and \
250 min(self.sampling_arcsec) < 5 * convolutionPixelSize:
251 convolutionPixelSize = min(self.sampling_arcsec) / 5.0
252 # avoid too fine convolution arrays.
253 sizes = list(self.cell_arcsec) + [fwhm_arcsec]
254 if self.is_sampling_set:
255 sizes += list(self.sampling_arcsec)
256 minsize = min(sizes)
257 support_min = 1000.
258 support_fwhm = 5000.
259 if minsize > support_min * convolutionPixelSize:
260 convolutionPixelSize = minsize / support_min
261 if fwhm_arcsec > support_fwhm * convolutionPixelSize:
262 convolutionPixelSize = min(fwhm_arcsec / support_fwhm, minsize / 10.)
264 if (self.taper < 0.1):
265 # Airly disk
266 return self.buildAiryDisk(fwhm_arcsec, 3., convolutionPixelSize,
267 truncate=truncate,
268 obscuration=self.antenna_block_m,
269 diameter=self.antenna_diam_m)
270 else:
271 # Gaussian beam
272 myxaxis = np.arange(-3 * fwhm_arcsec,
273 3 * fwhm_arcsec + 0.5 * convolutionPixelSize,
274 convolutionPixelSize)
275 myfunction = self.gauss(myxaxis, [fwhm_arcsec, truncate])
276 return myxaxis, myfunction
278 def get_kernel(self, axis):
279 """Return imaging kernel array."""
280 self.__assert_kernel()
281 if self.kernel_type == "SF":
282 # TODO: what to do for cell[0]!=cell[1]???
283 return self.get_sf_kernel(axis, self.kernel_param['convsupport'],
284 self.cell_arcsec[0])
285 elif self.kernel_type == "GJINC":
286 return self.get_gjinc_kernel(axis, self.kernel_param['truncate'],
287 self.kernel_param['gwidth'],
288 self.kernel_param['jwidth'],
289 self.cell_arcsec[0])
290 elif self.kernel_type == "GAUSS":
291 return self.get_gauss_kernel(axis, self.kernel_param['truncate'],
292 self.kernel_param['gwidth'],
293 self.cell_arcsec[0])
294 elif self.kernel_type == "BOX":
295 return self.get_box_kernel(axis, self.kernel_param['width'])
296 elif self.kernel_type == "PB":
297 diam = self.antenna_diam_m
298 if self.kernel_param['alma']:
299 diam = 10.7
300 casalog.post(
301 f"Using effective antenna diameter {diam}m for "
302 f"{self.kernel_type} kernel of ALMA antennas")
303 epsilon = self.antenna_block_m / diam
304 return self.get_pb_kernel(axis, diam, self.ref_freq, epsilon=epsilon)
305 # return (self.rootAiryIntensity(axis, epsilon))**2
306 else:
307 raise RuntimeError("Invalid kernel: %s" % self.kernel_type)
309 def summary(self):
310 """Print summary of parameters set to the class."""
311 casalog.post("=" * 40)
312 casalog.post("Summary of Image Beam Parameters")
313 casalog.post("=" * 40)
314 casalog.post("[Antenna]")
315 self.__antenna_summary()
317 casalog.post("\n[Imaging Parameters]")
318 self.__kernel_summary()
320 casalog.post("\n[Sampling]")
321 self.__sampling_summary()
323 def __notset_message(self):
324 casalog.post("Not set.")
326 def __antenna_summary(self):
327 """Print summary of antenna setup."""
328 if not self.is_antenna_set:
329 self.__notset_message()
330 return
331 casalog.post("diameter: %f m" % (self.antenna_diam_m))
332 casalog.post("blockage: %f m" % (self.antenna_block_m))
334 def __kernel_summary(self):
335 """Print summary of imaging parameter setup."""
336 if not self.is_kernel_set:
337 self.__notset_message()
338 return
339 casalog.post("reference frequency: %s" % str(self.ref_freq))
340 casalog.post("cell size: %s arcsec" % str(self.cell_arcsec))
341 casalog.post("kernel type: %s" % self.kernel_type)
342 for key, val in self.kernel_param.items():
343 casalog.post("%s: %s" % (key, str(val)))
345 def __sampling_summary(self):
346 """Print summary of sampling setup."""
347 if not self.is_sampling_set:
348 self.__notset_message()
349 return
350 casalog.post("sampling interval: %s arcsec" % str(self.sampling_arcsec))
351 casalog.post("position angle: %s" % (self.pa))
353 #
354 # Construct Kernel arrays
355 #
357 #
358 # BOX
359 #
360 def get_box_kernel(self, axis, width):
361 """Return a box kernel array with specified width.
363 axis: an array of xaxis values
364 width: kernel width
365 output array
366 out[i] = 1.0 (-width/2.0 <= x[i] <= width/2.0)
367 = 0.0 (else)
368 """
369 data = np.zeros(len(axis))
370 indices = np.where(abs(axis) <= width / 2.0)
371 data[indices] = 1.0
372 return data
374 #
375 # SF
376 #
377 def get_sf_kernel(self, axis, convsupport, cell_size):
378 """Return spheroidal kernel array.
380 axis: an array of xaxis values
381 convsupport: the truncation radius of SF kernel in pixel unit.
382 cell_size: image pixel size
384 Modified version of one in AnalysisUtils.sfBeamPredict (revision 1.2204, 2015/02/18)
385 """
386 convsupport = 3 if convsupport == -1 else convsupport
387 supportwidth = (convsupport * 1.0 + 0.0)
388 # value obtained by matching Fred's grdsf.f output with scipy(m=0,n=0)
389 c = 5.356 * np.pi / 2.0
390 sfaxis = axis / float(supportwidth * cell_size * 1.0)
391 indices = np.where(abs(sfaxis) < 1)[0]
392 centralRegion = sfaxis[indices]
393 centralRegionY = self.spheroidalWaveFunction(centralRegion, 0, 0, c, 1)
394 mysf = np.zeros(len(axis))
395 mysf[indices] += centralRegionY / max(centralRegionY)
396 return mysf
398 def spheroidalWaveFunction(self, x, m=0, n=0, c=0, alpha=0):
399 """Return spheroidal wave function.
401 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
402 """
403 if (type(x) != list and type(x) != np.ndarray):
404 returnScalar = True
405 x = [x]
406 else:
407 returnScalar = False
408 cv = scipy.special.pro_cv(m, n, c) # get the eigenvalue
409 result = scipy.special.pro_ang1_cv(m, n, c, cv, x)[0]
410 for i in range(len(x)):
411 nu = x[i] # (i-0.5*len(x))/(0.5*len(x)) # only true if x is symmetric about zero
412 result[i] *= (1 - nu**2)**alpha
413 # The peak of this function is about 10000 for m=0,n=0,c=6
414 if (returnScalar):
415 return result[0]
416 else:
417 return result
419 #
420 # GAUSS
421 #
422 def get_gauss_kernel(self, axis, truncate, gwidth, cell_arcsec):
423 """Return a gaussian kernel array.
425 axis : an array of xaxis values
426 truncate : truncation radius
427 NOTE definition is different from truncate in gauss()!
428 gwidth : kernel gwidth
429 cell_arcsec : image pixel size in unit of arcsec
430 """
431 if gwidth == -1:
432 gwidth = np.sqrt(np.log(2.0))
433 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec)
434 # get gauss for full axis
435 result = self.gauss(axis, [gwidth_arcsec])
436 # truncate kernel outside the truncation radius
437 if truncate == -1:
438 trunc_arcsec = gwidth_arcsec * 1.5
439 elif truncate is not None:
440 trunc_arcsec = self.__parse_width(truncate, cell_arcsec)
441 idx = np.where(abs(axis) > trunc_arcsec)
442 result[idx] = 0.
443 return result
445 def gauss(self, x, parameters):
446 """Compute the value of the Gaussian function.
448 Compute the value of the Gaussian function at the specified
449 location(s) with respect to the peak (which is assumed to be at x=0).
450 truncate: if not None, then set result to zero if below this value.
451 -Todd Hunter
452 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
453 """
454 if (type(parameters) != np.ndarray and type(parameters) != list):
455 parameters = np.array([parameters])
456 if (len(parameters) < 2):
457 parameters = np.array([parameters[0], 0])
458 fwhm = parameters[0]
459 x = np.asarray(x, dtype=np.float64)
460 sigma = fwhm / 2.3548201
461 result = np.exp(-(x**2 / (2.0 * sigma**2)))
462 idx = np.where(result < parameters[1])[0]
463 result[idx] = 0
464 return result
466 #
467 # GJINC
468 #
469 def get_gjinc_kernel(self, axis, truncate, gwidth, jwidth, cell_arcsec):
470 """Return a GJinc kernel array.
472 axis : an array of xaxis values
473 truncate : truncation radius (NOT SUPPORTED YET)
474 gwidth jwidth : kernel gwidth
475 cell_arcsec : image pixel size in unit of arcsec
476 """
477 if gwidth == -1:
478 gwidth = 2.52 * np.sqrt(np.log(2.0))
479 if jwidth == -1:
480 jwidth = 1.55
481 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec)
482 jwidth_arcsec = self.__parse_width(jwidth, cell_arcsec)
483 mygjinc = self.trunc(self.gjinc(axis, gwidth=gwidth_arcsec,
484 jwidth=jwidth_arcsec,
485 useCasaJinc=True, normalize=False))
486 return mygjinc
488 def gjinc(self, x, gwidth, jwidth, useCasaJinc=False, normalize=False):
489 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
490 if (useCasaJinc):
491 result = self.grdjinc1(x, jwidth, normalize) * self.gjincGauss(x, gwidth)
492 else:
493 result = self.jinc(x, jwidth) * self.gjincGauss(x, gwidth)
494 return result
496 def grdjinc1(self, val, c, normalize=True):
497 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
498 # Casa's function
499 # // Calculate J_1(x) using approximate formula
500 xs = np.pi * val / c
501 result = []
502 for x in xs:
503 x = abs(x) # I added this to make it symmetric
504 ax = abs(x)
505 if (ax < 8.0):
506 y = x * x
507 ans1 = x * (72362614232.0 + y *
508 (-7895059235.0 + y * (242396853.1 + y *
509 (-2972611.439 + y * (15704.48260 + y * (-30.16036606))))))
510 ans2 = 144725228442.0 + y * (2300535178.0 + y *
511 (18583304.74 + y * (99447.43394 + y *
512 (376.9991397 + y * 1.0))))
513 ans = ans1 / ans2
514 else:
515 z = 8.0 / ax
516 y = z * z
517 xx = ax - 2.356194491
518 ans1 = 1.0 + y * (0.183105e-2 + y *
519 (-0.3516396496e-4 + y *
520 (0.2457520174e-5 + y * (-0.240337019e-6))))
521 ans2 = 0.04687499995 + y * (-0.2002690873e-3 + y *
522 (0.8449199096e-5 + y *
523 (-0.88228987e-6 + y * (0.105787412e-6))))
524 ans = sqrt(0.636619772 / ax) * (cos(xx) * ans1 - z * sin(xx) * ans2)
525 if (x < 0.0):
526 ans = -ans
527 if (x == 0.0):
528 out = 0.5
529 else:
530 out = ans / x
531 if (normalize):
532 out = out / 0.5
533 result.append(out)
534 return(result)
536 def jinc(self, x, jwidth):
537 """Compute JINC value.
539 The peak of this function is 0.5.
540 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
541 """
542 argument = np.pi * np.abs(x) / jwidth
543 np.seterr(invalid='ignore') # prevent warning for central point
544 result = scipy.special.j1(argument) / argument
545 np.seterr(invalid='warn')
546 for i in range(len(x)):
547 if (abs(x[i]) < 1e-8):
548 result[i] = 0.5
549 return result
551 def gjincGauss(self, x, gwidth):
552 return (np.exp(-np.log(2) * (x / float(gwidth))**2))
554 #
555 # Airly disk
556 #
557 def get_pb_kernel(self, axis, diam, ref_freq, epsilon=0.0):
558 """Return Airy Disk array.
560 Return Airy Disk array defined by the axis, diameter, reference frequency
561 and ratio of central hole and antenna diameter
563 axis: x-axis values
564 diameter: antenna diameter in unit of m
565 reference frequency: the reference frequency of the image
566 epsilon: ratio of central hole diameter to antenna diameter
567 """
568 a = (self.rootAiryIntensity(axis, epsilon))**2
569 airyfwhm = findFWHM(axis, a)
570 fwhm = primaryBeamArcsec(frequency=ref_freq, diameter=diam,
571 taper=self.taper, showEquation=False,
572 obscuration=diam * epsilon,
573 fwhmfactor=None)
574 ratio = fwhm / airyfwhm
575 tempaxis = axis / ratio
576 a = self.rootAiryIntensity(tempaxis, epsilon)
577 return a**2
579 def buildAiryDisk(self, fwhm, xaxisLimitInUnitsOfFwhm, convolutionPixelSize,
580 truncate=False, obscuration=0.75, diameter=12.0):
581 """Build airy disk.
583 This function computes the Airy disk (with peak of 1.0) across a grid of points
584 specified in units of the FWHM of the disk.
585 fwhm: a value in arcsec
586 xaxisLimitInUnitsOfFwhm: an integer or floating point unitless value
587 truncate: if True, truncate the function at the first null (on both sides)
588 obscuration: central hole diameter (meters)
589 diameter: dish diameter (meters)
590 obscuration and diameter are used to compute the blockage ratio (epsilon)
591 and its effect on the pattern
592 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
593 """
594 epsilon = obscuration / diameter
595 # casalog.post("Using epsilon = %f" % (epsilon))
596 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm,
597 xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize,
598 convolutionPixelSize)
599 a = (self.rootAiryIntensity(myxaxis, epsilon))**2
600 # Scale the Airy disk to the desired FWHM, and recompute on finer grid
601 airyfwhm = findFWHM(myxaxis, a)
602 ratio = fwhm / airyfwhm
603 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm / ratio,
604 (xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize) / ratio,
605 convolutionPixelSize / ratio)
606 a = self.rootAiryIntensity(myxaxis, epsilon)
607 if (truncate):
608 a = self.trunc(a)
609 a = a**2
610 myxaxis *= ratio
611 return(myxaxis, a)
613 def rootAiryIntensity(self, myxaxis, epsilon=0.0, showplot=False):
614 """Compute the value of 2*J1(x)/x.
616 This function computes 2*J1(x)/x, which can be squared to get an Airy disk.
617 myxaxis: the x-axis values to use
618 epsilon: radius of central hole in units of the dish diameter
619 Migrated from AnalysisUtils.py (revision 1.2204, 2015/02/18)
620 """
621 if (epsilon > 0):
622 a = (2 * spspec.j1(myxaxis) / myxaxis -
623 epsilon**2 * 2 * spspec.j1(myxaxis * epsilon) /
624 (epsilon * myxaxis)) / (1 - epsilon**2)
625 else:
626 a = 2 * spspec.j1(myxaxis) / myxaxis # simpler formula for epsilon=0
627 return(a)
629 def trunc(self, result):
630 """Truncate a list at the first null.
632 Truncates a list at the first null on both sides of the center,
633 starting at the center and moving outward in each direction.
634 Assumes the list is positive in the center, e.g. a Gaussian beam.
635 -Todd Hunter
636 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
637 """
638 # casa default truncate=-1, which means truncate at radius of first null
639 mask = np.zeros(len(result))
640 truncateBefore = 0
641 truncateBeyond = len(mask)
642 for r in range(len(result) // 2, len(result)):
643 if (result[r] < 0):
644 truncateBeyond = r
645 break
646 for r in range(len(result) // 2, 0, -1):
647 if (result[r] < 0):
648 truncateBefore = r
649 break
650 mask[truncateBefore:truncateBeyond] = 1
651 # casalog.post(
652 # "Truncating outside of pixels %d-%d (len=%d)" %
653 # (truncateBefore,truncateBeyond-1,len(mask)))
654 result *= mask
655 return result
657 def gaussfit_errfunc(self, parameters, x, y):
658 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)."""
659 return (y - self.gauss(x, parameters))
661 def gaussfit(self, x, y, showplot=False, minlevel=0, verbose=False,
662 title=None, truncate=False):
663 """Perform 1D Gaussian fit.
665 Fits a 1D Gaussian assumed to be centered at x=0 with amp=1 to the
666 specified data, with an option to truncate it at some level.
667 Returns the FWHM and truncation point.
668 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
669 """
670 fwhm_guess = findFWHM(x, y)
671 if (truncate == False):
672 parameters = np.asarray([fwhm_guess], dtype=np.float64)
673 else:
674 parameters = np.asarray([fwhm_guess, truncate], dtype=np.float64)
675 if (verbose):
676 casalog.post("Fitting for %d parameters: guesses = %s" % (len(parameters), parameters))
677 xx = np.asarray(x, dtype=np.float64)
678 yy = np.asarray(y, dtype=np.float64)
679 lenx = len(x)
680 if (minlevel > 0):
681 xwidth = findFWHM(x, y, minlevel)
682 xx = x[np.where(np.abs(x) < xwidth * 0.5)[0]]
683 yy = y[np.where(np.abs(x) < xwidth * 0.5)[0]]
684 if (verbose):
685 casalog.postt("Keeping %d/%d points, guess = %f arcsec" %
686 (len(x), lenx, fwhm_guess))
687 result = optimize.leastsq(self.gaussfit_errfunc, parameters, args=(xx, yy),
688 full_output=1)
689 bestParameters = result[0]
690 infodict = result[2]
691 numberFunctionCalls = infodict['nfev']
692 mesg = result[3]
693 ier = result[4]
694 if (verbose):
695 casalog.post("optimize.leastsq: ier=%d, #calls=%d, message = %s" %
696 (ier, numberFunctionCalls, mesg))
697 if (type(bestParameters) == list or type(bestParameters) == np.ndarray):
698 fwhm = bestParameters[0]
699 if verbose:
700 casalog.post("fitted FWHM = %f" % (fwhm))
701 if (truncate != False):
702 truncate = bestParameters[1]
703 casalog.post("optimized truncation = %f" % (truncate))
704 else:
705 fwhm = bestParameters
706 return(fwhm, truncate)
709def findFWHM(x, y, level=0.5, s=0):
710 """Measure the FWHM of the specified profile.
712 This works well in a noise-free environment.
713 The data are assumed to be sorted by the x variable.
714 x: the position variable
715 y: the intensity variable
716 level: the signal level for which to find the full width
717 s: see help scipy.interpolate.UnivariateSpline
718 -Todd Hunter
719 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
720 """
721 halfmax = np.max(y) * level
722 spline = scipy.interpolate.UnivariateSpline(x, y - halfmax, s=s)
723 result = spline.roots()
724 if (len(result) == 2):
725 x0, x1 = result
726 return(abs(x1 - x0))
727 elif (len(result) == 1):
728 return(2 * abs(result[0]))
729 else:
730 # modified (KS 2015/02/19)
731 # casalog.post(
732 # f"More than two crossings ({len(result)}), "
733 # "fitting slope to points near that power level."
734 # )
735 # result = 2*findZeroCrossingBySlope(x, y-halfmax)
736 # return(result)
737 errmsg = "Unsupported FWHM search in CASA. " + \
738 f"More than two corssings ({len(result)}) at level {halfmax} ({level} % of peak)."
739 raise Exception(errmsg)
742def primaryBeamArcsec(frequency, diameter, obscuration, taper,
743 showEquation=True, use2007formula=True, fwhmfactor=None):
744 """Implement the Baars formula: b*lambda / D.
746 if use2007formula==False, use the formula from ALMA Memo 456
747 if use2007formula==True, use the formula from Baars 2007 book
748 (see au.baarsTaperFactor)
749 In either case, the taper value is expected to be entered as positive.
750 Note: if a negative value is entered, it is converted to positive.
751 The effect of the central obstruction on the pattern is also accounted for
752 by using a spline fit to Table 10.1 of Schroeder's Astronomical Optics.
753 fwhmfactor: if given, then ignore the taper
754 For further help and examples, see https://safe.nrao.edu/wiki/bin/view/ALMA/PrimaryBeamArcsec
755 -- Todd Hunter
756 Simplified version of the one in AnalysisUtils (revision 1.2204, 2015/02/18)
757 """
758 if (fwhmfactor is not None):
759 taper = effectiveTaper(fwhmfactor, diameter, obscuration, use2007formula)
760 if (taper is None):
761 return
762 if (taper < 0):
763 taper = abs(taper)
764 if (obscuration > 0.4 * diameter):
765 casalog.post(
766 "This central obscuration is too large for the method of calculation employed here.")
767 return
768 if (type(frequency) == str):
769 my_qa = quanta()
770 frequency = my_qa.getvalue(my_qa.convert(frequency, "Hz"))[0]
771 lambdaMeters = 2.99792458e8 / frequency
772 b = baarsTaperFactor(taper, use2007formula) * centralObstructionFactor(diameter, obscuration)
773 if (showEquation):
774 if (use2007formula):
775 formula = "Baars (2007) Eq 4.13"
776 else:
777 formula = "ALMA memo 456 Eq. 18"
778 casalog.post(
779 f"Coefficient from {formula} for a -{taper:.1f}dB edge taper "
780 f"and obscuration ratio={obscuration:g}/{diameter:g} = {b:.3f}*lambda/D")
781 return(b * lambdaMeters * 3600 * 180 / (diameter * np.pi))
784def effectiveTaper(fwhmFactor=1.16, diameter=12, obscuration=0.75,
785 use2007formula=True):
786 """Compute effective taper from constant factor for illumination pattern.
788 The inverse of (Baars formula multiplied by the central
789 obstruction factor). Converts an observed value of the constant X in
790 the formula FWHM=X*lambda/D into a taper in dB (positive value).
791 if use2007formula == False, use Equation 18 from ALMA Memo 456
792 if use2007formula == True, use Equation 4.13 from Baars 2007 book
793 -- Todd Hunter
794 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
795 """
796 cOF = centralObstructionFactor(diameter, obscuration)
797 if (fwhmFactor < 1.02 or fwhmFactor > 1.22):
798 casalog.post("Invalid fwhmFactor (1.02<fwhmFactor<1.22)")
799 return
800 if (baarsTaperFactor(10, use2007formula) * cOF < fwhmFactor):
801 increment = 0.01
802 for taper_dB in np.arange(10, 10 + increment * 1000, increment):
803 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor > 0):
804 break
805 else:
806 increment = -0.01
807 for taper_dB in np.arange(10, 10 + increment * 1000, increment):
808 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor < 0):
809 break
810 return(taper_dB)
813def baarsTaperFactor(taper_dB, use2007formula=True):
814 """Convert a taper to the constant factor for illumination pattern.
816 Converts a taper in dB to the constant X
817 in the formula FWHM=X*lambda/D for the parabolic illumination pattern.
818 We assume that taper_dB comes in as a positive value.
819 use2007formula: False --> use Equation 18 from ALMA Memo 456.
820 True --> use Equation 4.13 from Baars 2007 book
821 - Todd Hunter
822 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
823 """
824 tau = 10**(-0.05 * taper_dB)
825 if (use2007formula):
826 return(1.269 - 0.566 * tau + 0.534 * (tau**2) - 0.208 * (tau**3))
827 else:
828 return(1.243 - 0.343 * tau + 0.12 * (tau**2))
831def centralObstructionFactor(diameter=12.0, obscuration=0.75):
832 """Compute the scale factor of an Airy pattern.
834 Computes the scale factor of an Airy pattern as a function of the
835 central obscuration, using Table 10.1 of Schroeder's 'Astronomical Optics'.
836 -- Todd Hunter
837 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18)
838 """
839 epsilon = obscuration / diameter
840 myspline = scipy.interpolate.UnivariateSpline(
841 [0, 0.1, 0.2, 0.33, 0.4], [1.22, 1.205, 1.167, 1.098, 1.058], s=0)
842 factor = myspline(epsilon) / 1.22
843 if (type(factor) == np.float64):
844 # casapy 4.2
845 return(factor)
846 else:
847 # casapy 4.1 and earlier
848 return(factor[0])