Coverage for /home/casatest/venv/lib/python3.12/site-packages/casatasks/private/sdbeamutil.py: 12%

443 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-21 07:43 +0000

1 

2import numpy as np 

3from numpy import cos, sin, sqrt 

4import scipy.interpolate 

5import scipy.optimize as optimize 

6import scipy.signal 

7import scipy.special as spspec 

8 

9from casatasks import casalog 

10from casatools import quanta 

11 

12 

13################################################## 

14# Prediction of theoretical beam size 

15################################################## 

16class TheoreticalBeam: 

17 """ 

18 The class to derive the theoretical beam size of an image. 

19 

20 Example: 

21 bu = sdbeamutil.TheoreticalBeam() 

22 # set imaging, antenna, and pointing informations 

23 bu.set_antenna('12m',blockage='0.75m',taper=10) 

24 bu.set_sampling(['12arcsec','12arcsec']) 

25 bu.set_image_param('5arcsec', '115GHz','SF', 6, -1, -1, -1) 

26 # print summary of setup to logger 

27 bu.summary() 

28 # get theoretical beam size of an image. 

29 beam = bu.get_beamsize_image() 

30 """ 

31 

32 def __init__(self): 

33 self.is_antenna_set = False 

34 self.is_kernel_set = False 

35 self.is_sampling_set = False 

36 self.antenna_diam_m = -1. 

37 self.antenna_block_m = 0.0 

38 self.taper = 10 

39 self.ref_freq = -1. 

40 self.kernel_type = "" 

41 self.kernel_param = {} 

42 self.pa = "0.0deg" 

43 self.sampling_arcsec = [] 

44 self.cell_arcsec = [] 

45 

46 def __to_arcsec_list(self, angle): 

47 """Return a list of angles in arcsec (value only without unit).""" 

48 if type(angle) not in [list, tuple, np.ndarray]: 

49 angle = [angle] 

50 return [self.__to_arcsec(val) for val in angle] 

51 

52 def __to_arcsec(self, angle): 

53 """Convert angle to arcsec and return the value without unit.""" 

54 my_qa = quanta() 

55 if my_qa.isangle(angle): 

56 return my_qa.getvalue(my_qa.convert(angle, "arcsec"))[0] 

57 elif my_qa.getunit(angle) == '': 

58 return float(angle) 

59 else: 

60 raise ValueError("Invalid angle: %s" % (str(angle))) 

61 

62 def __parse_width(self, val, cell_size_arcsec): 

63 """Convert value in unit of arcsec. 

64 

65 If val is angle, returns a float value in unit of arcsec. 

66 else the unit is assumed to be pixel and multiplied by cell_size_arcsec 

67 """ 

68 my_qa = quanta() 

69 if my_qa.isangle(val): 

70 return self.__to_arcsec(val) 

71 elif my_qa.getunit(val) in ('', 'pixel'): 

72 return my_qa.getvalue(val)[0] * cell_size_arcsec 

73 else: 

74 raise ValueError("Invalid width %s" % str(val)) 

75 

76 def set_antenna(self, diam, blockage="0.0m", taper=10): 

77 """Set parameters to construct antenna beam. 

78 

79 antenna diameter and blockage 

80 taper: the illumination taper in dB 

81 """ 

82 # try quantity 

83 my_qa = quanta() 

84 self.antenna_diam_m = my_qa.getvalue(my_qa.convert(diam, "m"))[0] 

85 self.antenna_block_m = my_qa.getvalue(my_qa.convert(blockage, "m"))[0] 

86 self.taper = taper 

87 self.is_antenna_set = True 

88 

89 def set_sampling(self, intervals, pa="0deg"): 

90 """Set sampling interval of observation. 

91 

92 intervals: pointing inteval of observation (['10arcsec','10arcsec']) 

93 pa: position angle (NOT USED) 

94 """ 

95 self.pa = pa 

96 self.sampling_arcsec = [abs(a) for a in 

97 self.__to_arcsec_list(intervals)] 

98 self.is_sampling_set = True 

99 

100 def set_image_param(self, cell, ref_freq, gridfunction, 

101 convsupport, truncate, gwidth, jwidth, is_alma=False): 

102 """Set imaging parameters. 

103 

104 cell: image pixel size 

105 ref_freq: reference frequency of image to calculate beam size 

106 gridfunction, convsupport, truncate, gwidth, jwidth: 

107 parameters passed to imager 

108 is_alma: valid only for PB kernel to use 10.7m 

109 """ 

110 self.ref_freq = ref_freq 

111 self.cell_arcsec = [abs(a) for a in 

112 self.__to_arcsec_list(cell)] 

113 if gridfunction.upper() == "SF": 

114 self.__set_sf_kernel(convsupport) 

115 elif gridfunction.upper() == "GJINC": 

116 self.__set_gjinc_kernel(truncate, gwidth, jwidth) 

117 elif gridfunction.upper() == "GAUSS": 

118 self.__set_gauss_kernel(truncate, gwidth) 

119 elif gridfunction.upper() == "BOX": 

120 self.__set_box_kernel(self.cell_arcsec[0]) 

121 elif gridfunction.upper() == "PB": 

122 self.__set_pb_kernel(is_alma) 

123 self.is_kernel_set = True 

124 

125 def __set_sf_kernel(self, convsupport): 

126 """Set SF kernel parameter to the class.""" 

127 self.kernel_type = "SF" 

128 self.kernel_param = dict(convsupport=convsupport) 

129 

130 def __set_gjinc_kernel(self, truncate, gwidth, jwidth): 

131 """Set GJINC kernel parameter to the class.""" 

132 self.kernel_type = "GJINC" 

133 self.kernel_param = dict(truncate=truncate, gwidth=gwidth, jwidth=jwidth) 

134 

135 def __set_gauss_kernel(self, truncate, gwidth): 

136 """Set GAUSS kernel parameter to the class.""" 

137 self.kernel_type = "GAUSS" 

138 self.kernel_param = dict(truncate=truncate, gwidth=gwidth) 

139 

140 def __set_box_kernel(self, width): 

141 """Set BOX kernel parameter to the class.""" 

142 self.kernel_type = "BOX" 

143 self.kernel_param = dict(width=width) 

144 

145 def __set_pb_kernel(self, alma=False): 

146 """Set PB kernel parameter to the class.""" 

147 self.kernel_type = "PB" 

148 self.kernel_param = dict(alma=alma) 

149 

150 def get_beamsize_image(self): 

151 """Return FWHM of theoretical beam size in image. 

152 

153 The FWHM is derived by fitting gridded beam with Gaussian function. 

154 """ 

155 # assert necessary information is set 

156 self.__assert_antenna() 

157 self.__assert_kernel() 

158 self.__assert_sampling() 

159 casalog.post("Calculating theoretical beam size of the image") 

160 # construct theoretic beam for image 

161 axis, beam = self.get_antenna_beam() 

162 casalog.post( 

163 f"Length of convolution array={len(axis)}, " 

164 f"total width={axis[-1] - axis[0]} arcsec, " 

165 f"separation={axis[1] - axis[0]} arcsec", 

166 priority="DEBUG1") 

167 kernel = self.get_kernel(axis) 

168 sampling = self.get_box_kernel(axis, self.sampling_arcsec[0]) 

169 # convolution 

170 gridded = np.convolve(beam, kernel, mode='same') 

171 gridded /= max(gridded) 

172 result = np.convolve(gridded, sampling, mode='same') 

173 result /= max(result) 

174 # fwhm_arcsec = findFWHM(axis,result) 

175 fwhm_arcsec, dummy = self.gaussfit(axis, result, minlevel=0.0, truncate=False) 

176 casalog.post("- initial FWHM of beam = %f arcsec" % 

177 findFWHM(axis, beam)) 

178 casalog.post("- FWHM of gridding kernel = %f arcsec" % 

179 findFWHM(axis, kernel)) 

180 casalog.post("- FWHM of theoretical beam = %f arcsec" % 

181 findFWHM(axis, result)) 

182 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" % 

183 fwhm_arcsec) 

184 del result 

185 if len(self.sampling_arcsec) == 1 and \ 

186 (len(self.cell_arcsec) == 1 or self.kernel_type != "BOX"): 

187 fwhm_geo_arcsec = fwhm_arcsec 

188 else: 

189 if len(self.sampling_arcsec) > 1: 

190 sampling = self.get_box_kernel(axis, self.sampling_arcsec[1]) 

191 elif self.kernel_type == "BOX" and len(self.cell_arcsec) > 1: 

192 kernel = self.get_box_kernel(axis, self.cell_arcsec[1]) 

193 gridded = np.convolve(beam, kernel, mode='same') 

194 gridded /= max(gridded) 

195 result = np.convolve(gridded, sampling, mode='same') 

196 result /= max(result) 

197 # fwhm1 = findFWHM(axis,result) 

198 

199 # Gaussian fit to the result convolved with additional kernel 

200 # to take into account raster pointing pattern. 

201 # CAS-14608 

202 # Note that the fitting could fail especially when the 

203 # pointing pattern is peculiar due to hardware/software problem 

204 # related to the antenna. In that case, the FWHM estimate will 

205 # fall back to theoretical beam size. 

206 try: 

207 fwhm1, _ = self.gaussfit(axis, result, minlevel=0.0, truncate=False) 

208 fwhm_geo_arcsec = np.sqrt(fwhm_arcsec * fwhm1) 

209 casalog.post("The second axis") 

210 casalog.post("- FWHM of gridding kernel = %f arcsec" % 

211 findFWHM(axis, kernel)) 

212 casalog.post("- FWHM of theoretical beam = %f arcsec" % 

213 findFWHM(axis, result)) 

214 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" % 

215 fwhm1) 

216 except Exception as e: 

217 casalog.post( 

218 "Gaussian fit to get effective beam size failed" 

219 f" with the following error: {e} \n" 

220 "Falling back to the theoretical beam size" 

221 " derived from the antenna beam pattern.", 

222 priority="WARN" 

223 ) 

224 fwhm_geo_arcsec = fwhm_arcsec 

225 finally: 

226 del result 

227 # clear-up axes 

228 del axis, beam, kernel, sampling, gridded 

229 

230 return dict(major="%farcsec" % (fwhm_geo_arcsec), 

231 minor="%farcsec" % (fwhm_geo_arcsec), 

232 pa=self.pa) 

233 

234 def __assert_antenna(self): 

235 """Raise an error if antenna information is not set.""" 

236 if not self.is_antenna_set: 

237 raise RuntimeError("Antenna is not set") 

238 

239 def __assert_kernel(self): 

240 """Raise an error if imaging parameters are not set.""" 

241 if not self.is_kernel_set: 

242 raise RuntimeError("Kernel is not set.") 

243 

244 def __assert_sampling(self): 

245 """Raise an error if sampling information is not set.""" 

246 if not self.is_sampling_set: 

247 raise RuntimeError("Sampling information is not set") 

248 

249 def get_antenna_beam(self): 

250 """Return arrays of antenna beam response and it's horizontal axis. 

251 

252 Picked from AnalysisUtils (revision 1.2204, 2015/02/18) 

253 """ 

254 self.__assert_antenna() 

255 self.__assert_kernel() 

256 

257 fwhm_arcsec = primaryBeamArcsec(frequency=self.ref_freq, 

258 diameter=self.antenna_diam_m, 

259 taper=self.taper, showEquation=True, 

260 obscuration=self.antenna_block_m, 

261 fwhmfactor=None) 

262 truncate = False 

263 convolutionPixelSize = 0.02 # arcsec 

264 # avoid too coarse convolution array w.r.t. sampling 

265 if self.is_sampling_set and \ 

266 min(self.sampling_arcsec) < 5 * convolutionPixelSize: 

267 convolutionPixelSize = min(self.sampling_arcsec) / 5.0 

268 # avoid too fine convolution arrays. 

269 sizes = list(self.cell_arcsec) + [fwhm_arcsec] 

270 if self.is_sampling_set: 

271 sizes += list(self.sampling_arcsec) 

272 minsize = min(sizes) 

273 support_min = 1000. 

274 support_fwhm = 5000. 

275 if minsize > support_min * convolutionPixelSize: 

276 convolutionPixelSize = minsize / support_min 

277 if fwhm_arcsec > support_fwhm * convolutionPixelSize: 

278 convolutionPixelSize = min(fwhm_arcsec / support_fwhm, minsize / 10.) 

279 

280 if (self.taper < 0.1): 

281 # Airly disk 

282 return self.buildAiryDisk(fwhm_arcsec, 3., convolutionPixelSize, 

283 truncate=truncate, 

284 obscuration=self.antenna_block_m, 

285 diameter=self.antenna_diam_m) 

286 else: 

287 # Gaussian beam 

288 myxaxis = np.arange(-3 * fwhm_arcsec, 

289 3 * fwhm_arcsec + 0.5 * convolutionPixelSize, 

290 convolutionPixelSize) 

291 myfunction = self.gauss(myxaxis, [fwhm_arcsec, truncate]) 

292 return myxaxis, myfunction 

293 

294 def get_kernel(self, axis): 

295 """Return imaging kernel array.""" 

296 self.__assert_kernel() 

297 if self.kernel_type == "SF": 

298 # TODO: what to do for cell[0]!=cell[1]??? 

299 return self.get_sf_kernel(axis, self.kernel_param['convsupport'], 

300 self.cell_arcsec[0]) 

301 elif self.kernel_type == "GJINC": 

302 return self.get_gjinc_kernel(axis, self.kernel_param['truncate'], 

303 self.kernel_param['gwidth'], 

304 self.kernel_param['jwidth'], 

305 self.cell_arcsec[0]) 

306 elif self.kernel_type == "GAUSS": 

307 return self.get_gauss_kernel(axis, self.kernel_param['truncate'], 

308 self.kernel_param['gwidth'], 

309 self.cell_arcsec[0]) 

310 elif self.kernel_type == "BOX": 

311 return self.get_box_kernel(axis, self.kernel_param['width']) 

312 elif self.kernel_type == "PB": 

313 diam = self.antenna_diam_m 

314 if self.kernel_param['alma']: 

315 diam = 10.7 

316 casalog.post( 

317 f"Using effective antenna diameter {diam}m for " 

318 f"{self.kernel_type} kernel of ALMA antennas") 

319 epsilon = self.antenna_block_m / diam 

320 return self.get_pb_kernel(axis, diam, self.ref_freq, epsilon=epsilon) 

321 # return (self.rootAiryIntensity(axis, epsilon))**2 

322 else: 

323 raise RuntimeError("Invalid kernel: %s" % self.kernel_type) 

324 

325 def summary(self): 

326 """Print summary of parameters set to the class.""" 

327 casalog.post("=" * 40) 

328 casalog.post("Summary of Image Beam Parameters") 

329 casalog.post("=" * 40) 

330 casalog.post("[Antenna]") 

331 self.__antenna_summary() 

332 

333 casalog.post("\n[Imaging Parameters]") 

334 self.__kernel_summary() 

335 

336 casalog.post("\n[Sampling]") 

337 self.__sampling_summary() 

338 

339 def __notset_message(self): 

340 casalog.post("Not set.") 

341 

342 def __antenna_summary(self): 

343 """Print summary of antenna setup.""" 

344 if not self.is_antenna_set: 

345 self.__notset_message() 

346 return 

347 casalog.post("diameter: %f m" % (self.antenna_diam_m)) 

348 casalog.post("blockage: %f m" % (self.antenna_block_m)) 

349 

350 def __kernel_summary(self): 

351 """Print summary of imaging parameter setup.""" 

352 if not self.is_kernel_set: 

353 self.__notset_message() 

354 return 

355 casalog.post("reference frequency: %s" % str(self.ref_freq)) 

356 casalog.post("cell size: %s arcsec" % str(self.cell_arcsec)) 

357 casalog.post("kernel type: %s" % self.kernel_type) 

358 for key, val in self.kernel_param.items(): 

359 casalog.post("%s: %s" % (key, str(val))) 

360 

361 def __sampling_summary(self): 

362 """Print summary of sampling setup.""" 

363 if not self.is_sampling_set: 

364 self.__notset_message() 

365 return 

366 casalog.post("sampling interval: %s arcsec" % str(self.sampling_arcsec)) 

367 casalog.post("position angle: %s" % (self.pa)) 

368 

369 # 

370 # Construct Kernel arrays 

371 # 

372 

373 # 

374 # BOX 

375 # 

376 def get_box_kernel(self, axis, width): 

377 """Return a box kernel array with specified width. 

378 

379 axis: an array of xaxis values 

380 width: kernel width 

381 output array 

382 out[i] = 1.0 (-width/2.0 <= x[i] <= width/2.0) 

383 = 0.0 (else) 

384 """ 

385 data = np.zeros(len(axis)) 

386 indices = np.where(abs(axis) <= width / 2.0) 

387 data[indices] = 1.0 

388 return data 

389 

390 # 

391 # SF 

392 # 

393 def get_sf_kernel(self, axis, convsupport, cell_size): 

394 """Return spheroidal kernel array. 

395 

396 axis: an array of xaxis values 

397 convsupport: the truncation radius of SF kernel in pixel unit. 

398 cell_size: image pixel size 

399 

400 Modified version of one in AnalysisUtils.sfBeamPredict (revision 1.2204, 2015/02/18) 

401 """ 

402 convsupport = 3 if convsupport == -1 else convsupport 

403 supportwidth = (convsupport * 1.0 + 0.0) 

404 # value obtained by matching Fred's grdsf.f output with scipy(m=0,n=0) 

405 c = 5.356 * np.pi / 2.0 

406 sfaxis = axis / float(supportwidth * cell_size * 1.0) 

407 indices = np.where(abs(sfaxis) < 1)[0] 

408 centralRegion = sfaxis[indices] 

409 centralRegionY = self.spheroidalWaveFunction(centralRegion, 0, 0, c, 1) 

410 mysf = np.zeros(len(axis)) 

411 mysf[indices] += centralRegionY / max(centralRegionY) 

412 return mysf 

413 

414 def spheroidalWaveFunction(self, x, m=0, n=0, c=0, alpha=0): 

415 """Return spheroidal wave function. 

416 

417 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

418 """ 

419 if (type(x) != list and type(x) != np.ndarray): 

420 returnScalar = True 

421 x = [x] 

422 else: 

423 returnScalar = False 

424 cv = scipy.special.pro_cv(m, n, c) # get the eigenvalue 

425 result = scipy.special.pro_ang1_cv(m, n, c, cv, x)[0] 

426 for i in range(len(x)): 

427 nu = x[i] # (i-0.5*len(x))/(0.5*len(x)) # only true if x is symmetric about zero 

428 result[i] *= (1 - nu**2)**alpha 

429 # The peak of this function is about 10000 for m=0,n=0,c=6 

430 if (returnScalar): 

431 return result[0] 

432 else: 

433 return result 

434 

435 # 

436 # GAUSS 

437 # 

438 def get_gauss_kernel(self, axis, truncate, gwidth, cell_arcsec): 

439 """Return a gaussian kernel array. 

440 

441 axis : an array of xaxis values 

442 truncate : truncation radius 

443 NOTE definition is different from truncate in gauss()! 

444 gwidth : kernel gwidth 

445 cell_arcsec : image pixel size in unit of arcsec 

446 """ 

447 if gwidth == -1: 

448 gwidth = np.sqrt(np.log(2.0)) 

449 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec) 

450 # get gauss for full axis 

451 result = self.gauss(axis, [gwidth_arcsec]) 

452 # truncate kernel outside the truncation radius 

453 if truncate == -1: 

454 trunc_arcsec = gwidth_arcsec * 1.5 

455 elif truncate is not None: 

456 trunc_arcsec = self.__parse_width(truncate, cell_arcsec) 

457 idx = np.where(abs(axis) > trunc_arcsec) 

458 result[idx] = 0. 

459 return result 

460 

461 def gauss(self, x, parameters): 

462 """Compute the value of the Gaussian function. 

463 

464 Compute the value of the Gaussian function at the specified 

465 location(s) with respect to the peak (which is assumed to be at x=0). 

466 truncate: if not None, then set result to zero if below this value. 

467 -Todd Hunter 

468 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

469 """ 

470 if (type(parameters) != np.ndarray and type(parameters) != list): 

471 parameters = np.array([parameters]) 

472 if (len(parameters) < 2): 

473 parameters = np.array([parameters[0], 0]) 

474 fwhm = parameters[0] 

475 x = np.asarray(x, dtype=np.float64) 

476 sigma = fwhm / 2.3548201 

477 result = np.exp(-(x**2 / (2.0 * sigma**2))) 

478 idx = np.where(result < parameters[1])[0] 

479 result[idx] = 0 

480 return result 

481 

482 # 

483 # GJINC 

484 # 

485 def get_gjinc_kernel(self, axis, truncate, gwidth, jwidth, cell_arcsec): 

486 """Return a GJinc kernel array. 

487 

488 axis : an array of xaxis values 

489 truncate : truncation radius (NOT SUPPORTED YET) 

490 gwidth jwidth : kernel gwidth 

491 cell_arcsec : image pixel size in unit of arcsec 

492 """ 

493 if gwidth == -1: 

494 gwidth = 2.52 * np.sqrt(np.log(2.0)) 

495 if jwidth == -1: 

496 jwidth = 1.55 

497 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec) 

498 jwidth_arcsec = self.__parse_width(jwidth, cell_arcsec) 

499 mygjinc = self.trunc(self.gjinc(axis, gwidth=gwidth_arcsec, 

500 jwidth=jwidth_arcsec, 

501 useCasaJinc=True, normalize=False)) 

502 return mygjinc 

503 

504 def gjinc(self, x, gwidth, jwidth, useCasaJinc=False, normalize=False): 

505 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

506 if (useCasaJinc): 

507 result = self.grdjinc1(x, jwidth, normalize) * self.gjincGauss(x, gwidth) 

508 else: 

509 result = self.jinc(x, jwidth) * self.gjincGauss(x, gwidth) 

510 return result 

511 

512 def grdjinc1(self, val, c, normalize=True): 

513 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

514 # Casa's function 

515 # // Calculate J_1(x) using approximate formula 

516 xs = np.pi * val / c 

517 result = [] 

518 for x in xs: 

519 x = abs(x) # I added this to make it symmetric 

520 ax = abs(x) 

521 if (ax < 8.0): 

522 y = x * x 

523 ans1 = x * (72362614232.0 + y * 

524 (-7895059235.0 + y * (242396853.1 + y * 

525 (-2972611.439 + y * (15704.48260 + y * (-30.16036606)))))) 

526 ans2 = 144725228442.0 + y * (2300535178.0 + y * 

527 (18583304.74 + y * (99447.43394 + y * 

528 (376.9991397 + y * 1.0)))) 

529 ans = ans1 / ans2 

530 else: 

531 z = 8.0 / ax 

532 y = z * z 

533 xx = ax - 2.356194491 

534 ans1 = 1.0 + y * (0.183105e-2 + y * 

535 (-0.3516396496e-4 + y * 

536 (0.2457520174e-5 + y * (-0.240337019e-6)))) 

537 ans2 = 0.04687499995 + y * (-0.2002690873e-3 + y * 

538 (0.8449199096e-5 + y * 

539 (-0.88228987e-6 + y * (0.105787412e-6)))) 

540 ans = sqrt(0.636619772 / ax) * (cos(xx) * ans1 - z * sin(xx) * ans2) 

541 if (x < 0.0): 

542 ans = -ans 

543 if (x == 0.0): 

544 out = 0.5 

545 else: 

546 out = ans / x 

547 if (normalize): 

548 out = out / 0.5 

549 result.append(out) 

550 return(result) 

551 

552 def jinc(self, x, jwidth): 

553 """Compute JINC value. 

554 

555 The peak of this function is 0.5. 

556 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

557 """ 

558 argument = np.pi * np.abs(x) / jwidth 

559 np.seterr(invalid='ignore') # prevent warning for central point 

560 result = scipy.special.j1(argument) / argument 

561 np.seterr(invalid='warn') 

562 for i in range(len(x)): 

563 if (abs(x[i]) < 1e-8): 

564 result[i] = 0.5 

565 return result 

566 

567 def gjincGauss(self, x, gwidth): 

568 return (np.exp(-np.log(2) * (x / float(gwidth))**2)) 

569 

570 # 

571 # Airly disk 

572 # 

573 def get_pb_kernel(self, axis, diam, ref_freq, epsilon=0.0): 

574 """Return Airy Disk array. 

575 

576 Return Airy Disk array defined by the axis, diameter, reference frequency 

577 and ratio of central hole and antenna diameter 

578 

579 axis: x-axis values 

580 diameter: antenna diameter in unit of m 

581 reference frequency: the reference frequency of the image 

582 epsilon: ratio of central hole diameter to antenna diameter 

583 """ 

584 a = (self.rootAiryIntensity(axis, epsilon))**2 

585 airyfwhm = findFWHM(axis, a) 

586 fwhm = primaryBeamArcsec(frequency=ref_freq, diameter=diam, 

587 taper=self.taper, showEquation=False, 

588 obscuration=diam * epsilon, 

589 fwhmfactor=None) 

590 ratio = fwhm / airyfwhm 

591 tempaxis = axis / ratio 

592 a = self.rootAiryIntensity(tempaxis, epsilon) 

593 return a**2 

594 

595 def buildAiryDisk(self, fwhm, xaxisLimitInUnitsOfFwhm, convolutionPixelSize, 

596 truncate=False, obscuration=0.75, diameter=12.0): 

597 """Build airy disk. 

598 

599 This function computes the Airy disk (with peak of 1.0) across a grid of points 

600 specified in units of the FWHM of the disk. 

601 fwhm: a value in arcsec 

602 xaxisLimitInUnitsOfFwhm: an integer or floating point unitless value 

603 truncate: if True, truncate the function at the first null (on both sides) 

604 obscuration: central hole diameter (meters) 

605 diameter: dish diameter (meters) 

606 obscuration and diameter are used to compute the blockage ratio (epsilon) 

607 and its effect on the pattern 

608 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

609 """ 

610 epsilon = obscuration / diameter 

611 # casalog.post("Using epsilon = %f" % (epsilon)) 

612 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm, 

613 xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize, 

614 convolutionPixelSize) 

615 a = (self.rootAiryIntensity(myxaxis, epsilon))**2 

616 # Scale the Airy disk to the desired FWHM, and recompute on finer grid 

617 airyfwhm = findFWHM(myxaxis, a) 

618 ratio = fwhm / airyfwhm 

619 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm / ratio, 

620 (xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize) / ratio, 

621 convolutionPixelSize / ratio) 

622 a = self.rootAiryIntensity(myxaxis, epsilon) 

623 if (truncate): 

624 a = self.trunc(a) 

625 a = a**2 

626 myxaxis *= ratio 

627 return(myxaxis, a) 

628 

629 def rootAiryIntensity(self, myxaxis, epsilon=0.0, showplot=False): 

630 """Compute the value of 2*J1(x)/x. 

631 

632 This function computes 2*J1(x)/x, which can be squared to get an Airy disk. 

633 myxaxis: the x-axis values to use 

634 epsilon: radius of central hole in units of the dish diameter 

635 Migrated from AnalysisUtils.py (revision 1.2204, 2015/02/18) 

636 """ 

637 if (epsilon > 0): 

638 a = (2 * spspec.j1(myxaxis) / myxaxis - 

639 epsilon**2 * 2 * spspec.j1(myxaxis * epsilon) / 

640 (epsilon * myxaxis)) / (1 - epsilon**2) 

641 else: 

642 a = 2 * spspec.j1(myxaxis) / myxaxis # simpler formula for epsilon=0 

643 return(a) 

644 

645 def trunc(self, result): 

646 """Truncate a list at the first null. 

647 

648 Truncates a list at the first null on both sides of the center, 

649 starting at the center and moving outward in each direction. 

650 Assumes the list is positive in the center, e.g. a Gaussian beam. 

651 -Todd Hunter 

652 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

653 """ 

654 # casa default truncate=-1, which means truncate at radius of first null 

655 mask = np.zeros(len(result)) 

656 truncateBefore = 0 

657 truncateBeyond = len(mask) 

658 for r in range(len(result) // 2, len(result)): 

659 if (result[r] < 0): 

660 truncateBeyond = r 

661 break 

662 for r in range(len(result) // 2, 0, -1): 

663 if (result[r] < 0): 

664 truncateBefore = r 

665 break 

666 mask[truncateBefore:truncateBeyond] = 1 

667 # casalog.post( 

668 # "Truncating outside of pixels %d-%d (len=%d)" % 

669 # (truncateBefore,truncateBeyond-1,len(mask))) 

670 result *= mask 

671 return result 

672 

673 def gaussfit_errfunc(self, parameters, x, y): 

674 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

675 return (y - self.gauss(x, parameters)) 

676 

677 def gaussfit(self, x, y, showplot=False, minlevel=0, verbose=False, 

678 title=None, truncate=False): 

679 """Perform 1D Gaussian fit. 

680 

681 Fits a 1D Gaussian assumed to be centered at x=0 with amp=1 to the 

682 specified data, with an option to truncate it at some level. 

683 Returns the FWHM and truncation point. 

684 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

685 """ 

686 fwhm_guess = findFWHM(x, y) 

687 if (truncate == False): 

688 parameters = np.asarray([fwhm_guess], dtype=np.float64) 

689 else: 

690 parameters = np.asarray([fwhm_guess, truncate], dtype=np.float64) 

691 if (verbose): 

692 casalog.post("Fitting for %d parameters: guesses = %s" % (len(parameters), parameters)) 

693 xx = np.asarray(x, dtype=np.float64) 

694 yy = np.asarray(y, dtype=np.float64) 

695 lenx = len(x) 

696 if (minlevel > 0): 

697 xwidth = findFWHM(x, y, minlevel) 

698 xx = x[np.where(np.abs(x) < xwidth * 0.5)[0]] 

699 yy = y[np.where(np.abs(x) < xwidth * 0.5)[0]] 

700 if (verbose): 

701 casalog.postt("Keeping %d/%d points, guess = %f arcsec" % 

702 (len(x), lenx, fwhm_guess)) 

703 result = optimize.leastsq(self.gaussfit_errfunc, parameters, args=(xx, yy), 

704 full_output=1) 

705 bestParameters = result[0] 

706 infodict = result[2] 

707 numberFunctionCalls = infodict['nfev'] 

708 mesg = result[3] 

709 ier = result[4] 

710 if (verbose): 

711 casalog.post("optimize.leastsq: ier=%d, #calls=%d, message = %s" % 

712 (ier, numberFunctionCalls, mesg)) 

713 if (type(bestParameters) == list or type(bestParameters) == np.ndarray): 

714 fwhm = bestParameters[0] 

715 if verbose: 

716 casalog.post("fitted FWHM = %f" % (fwhm)) 

717 if (truncate != False): 

718 truncate = bestParameters[1] 

719 casalog.post("optimized truncation = %f" % (truncate)) 

720 else: 

721 fwhm = bestParameters 

722 return(fwhm, truncate) 

723 

724 

725def findFWHM(x, y, level=0.5, s=0): 

726 """Measure the FWHM of the specified profile. 

727 

728 This works well in a noise-free environment. 

729 The data are assumed to be sorted by the x variable. 

730 x: the position variable 

731 y: the intensity variable 

732 level: the signal level for which to find the full width 

733 s: see help scipy.interpolate.UnivariateSpline 

734 -Todd Hunter 

735 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

736 """ 

737 halfmax = np.max(y) * level 

738 spline = scipy.interpolate.UnivariateSpline(x, y - halfmax, s=s) 

739 result = spline.roots() 

740 if (len(result) == 2): 

741 x0, x1 = result 

742 return(abs(x1 - x0)) 

743 elif (len(result) == 1): 

744 return(2 * abs(result[0])) 

745 else: 

746 # modified (KS 2015/02/19) 

747 # casalog.post( 

748 # f"More than two crossings ({len(result)}), " 

749 # "fitting slope to points near that power level." 

750 # ) 

751 # result = 2*findZeroCrossingBySlope(x, y-halfmax) 

752 # return(result) 

753 errmsg = "Unsupported FWHM search in CASA. " + \ 

754 f"More than two crossings ({len(result)}) at level {halfmax} ({level} % of peak)." 

755 raise Exception(errmsg) 

756 

757 

758def primaryBeamArcsec(frequency, diameter, obscuration, taper, 

759 showEquation=True, use2007formula=True, fwhmfactor=None): 

760 """Implement the Baars formula: b*lambda / D. 

761 

762 if use2007formula==False, use the formula from ALMA Memo 456 

763 if use2007formula==True, use the formula from Baars 2007 book 

764 (see au.baarsTaperFactor) 

765 In either case, the taper value is expected to be entered as positive. 

766 Note: if a negative value is entered, it is converted to positive. 

767 The effect of the central obstruction on the pattern is also accounted for 

768 by using a spline fit to Table 10.1 of Schroeder's Astronomical Optics. 

769 fwhmfactor: if given, then ignore the taper 

770 For further help and examples, see https://safe.nrao.edu/wiki/bin/view/ALMA/PrimaryBeamArcsec 

771 -- Todd Hunter 

772 Simplified version of the one in AnalysisUtils (revision 1.2204, 2015/02/18) 

773 """ 

774 if (fwhmfactor is not None): 

775 taper = effectiveTaper(fwhmfactor, diameter, obscuration, use2007formula) 

776 if (taper is None): 

777 return 

778 if (taper < 0): 

779 taper = abs(taper) 

780 if (obscuration > 0.4 * diameter): 

781 casalog.post( 

782 "This central obscuration is too large for the method of calculation employed here.") 

783 return 

784 if (type(frequency) == str): 

785 my_qa = quanta() 

786 frequency = my_qa.getvalue(my_qa.convert(frequency, "Hz"))[0] 

787 lambdaMeters = 2.99792458e8 / frequency 

788 b = baarsTaperFactor(taper, use2007formula) * centralObstructionFactor(diameter, obscuration) 

789 if (showEquation): 

790 if (use2007formula): 

791 formula = "Baars (2007) Eq 4.13" 

792 else: 

793 formula = "ALMA memo 456 Eq. 18" 

794 casalog.post( 

795 f"Coefficient from {formula} for a -{taper:.1f}dB edge taper " 

796 f"and obscuration ratio={obscuration:g}/{diameter:g} = {b:.3f}*lambda/D") 

797 return(b * lambdaMeters * 3600 * 180 / (diameter * np.pi)) 

798 

799 

800def effectiveTaper(fwhmFactor=1.16, diameter=12, obscuration=0.75, 

801 use2007formula=True): 

802 """Compute effective taper from constant factor for illumination pattern. 

803 

804 The inverse of (Baars formula multiplied by the central 

805 obstruction factor). Converts an observed value of the constant X in 

806 the formula FWHM=X*lambda/D into a taper in dB (positive value). 

807 if use2007formula == False, use Equation 18 from ALMA Memo 456 

808 if use2007formula == True, use Equation 4.13 from Baars 2007 book 

809 -- Todd Hunter 

810 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

811 """ 

812 cOF = centralObstructionFactor(diameter, obscuration) 

813 if (fwhmFactor < 1.02 or fwhmFactor > 1.22): 

814 casalog.post("Invalid fwhmFactor (1.02<fwhmFactor<1.22)") 

815 return 

816 if (baarsTaperFactor(10, use2007formula) * cOF < fwhmFactor): 

817 increment = 0.01 

818 for taper_dB in np.arange(10, 10 + increment * 1000, increment): 

819 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor > 0): 

820 break 

821 else: 

822 increment = -0.01 

823 for taper_dB in np.arange(10, 10 + increment * 1000, increment): 

824 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor < 0): 

825 break 

826 return(taper_dB) 

827 

828 

829def baarsTaperFactor(taper_dB, use2007formula=True): 

830 """Convert a taper to the constant factor for illumination pattern. 

831 

832 Converts a taper in dB to the constant X 

833 in the formula FWHM=X*lambda/D for the parabolic illumination pattern. 

834 We assume that taper_dB comes in as a positive value. 

835 use2007formula: False --> use Equation 18 from ALMA Memo 456. 

836 True --> use Equation 4.13 from Baars 2007 book 

837 - Todd Hunter 

838 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

839 """ 

840 tau = 10**(-0.05 * taper_dB) 

841 if (use2007formula): 

842 return(1.269 - 0.566 * tau + 0.534 * (tau**2) - 0.208 * (tau**3)) 

843 else: 

844 return(1.243 - 0.343 * tau + 0.12 * (tau**2)) 

845 

846 

847def centralObstructionFactor(diameter=12.0, obscuration=0.75): 

848 """Compute the scale factor of an Airy pattern. 

849 

850 Computes the scale factor of an Airy pattern as a function of the 

851 central obscuration, using Table 10.1 of Schroeder's 'Astronomical Optics'. 

852 -- Todd Hunter 

853 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

854 """ 

855 epsilon = obscuration / diameter 

856 myspline = scipy.interpolate.UnivariateSpline( 

857 [0, 0.1, 0.2, 0.33, 0.4], [1.22, 1.205, 1.167, 1.098, 1.058], s=0) 

858 factor = myspline(epsilon) / 1.22 

859 if (type(factor) == np.float64): 

860 # casapy 4.2 

861 return(factor) 

862 else: 

863 # casapy 4.1 and earlier 

864 return(factor[0])