Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatasks/private/sdbeamutil.py: 12%

439 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-31 17:39 +0000

1 

2import numpy as np 

3from numpy import cos, sin, sqrt 

4import scipy.interpolate 

5import scipy.optimize as optimize 

6import scipy.signal 

7import scipy.special as spspec 

8 

9from casatasks import casalog 

10from casatools import quanta 

11 

12 

13################################################## 

14# Prediction of theoretical beam size 

15################################################## 

16class TheoreticalBeam: 

17 """ 

18 The class to derive the theoretical beam size of an image. 

19 

20 Example: 

21 bu = sdbeamutil.TheoreticalBeam() 

22 # set imaging, antenna, and pointing informations 

23 bu.set_antenna('12m',blockage='0.75m',taper=10) 

24 bu.set_sampling(['12arcsec','12arcsec']) 

25 bu.set_image_param('5arcsec', '115GHz','SF', 6, -1, -1, -1) 

26 # print summary of setup to logger 

27 bu.summary() 

28 # get theoretical beam size of an image. 

29 beam = bu.get_beamsize_image() 

30 """ 

31 

32 def __init__(self): 

33 self.is_antenna_set = False 

34 self.is_kernel_set = False 

35 self.is_sampling_set = False 

36 self.antenna_diam_m = -1. 

37 self.antenna_block_m = 0.0 

38 self.taper = 10 

39 self.ref_freq = -1. 

40 self.kernel_type = "" 

41 self.kernel_param = {} 

42 self.pa = "0.0deg" 

43 self.sampling_arcsec = [] 

44 self.cell_arcsec = [] 

45 

46 def __to_arcsec_list(self, angle): 

47 """Return a list of angles in arcsec (value only without unit).""" 

48 if type(angle) not in [list, tuple, np.ndarray]: 

49 angle = [angle] 

50 return [self.__to_arcsec(val) for val in angle] 

51 

52 def __to_arcsec(self, angle): 

53 """Convert angle to arcsec and return the value without unit.""" 

54 my_qa = quanta() 

55 if my_qa.isangle(angle): 

56 return my_qa.getvalue(my_qa.convert(angle, "arcsec"))[0] 

57 elif my_qa.getunit(angle) == '': 

58 return float(angle) 

59 else: 

60 raise ValueError("Invalid angle: %s" % (str(angle))) 

61 

62 def __parse_width(self, val, cell_size_arcsec): 

63 """Convert value in unit of arcsec. 

64 

65 If val is angle, returns a float value in unit of arcsec. 

66 else the unit is assumed to be pixel and multiplied by cell_size_arcsec 

67 """ 

68 my_qa = quanta() 

69 if my_qa.isangle(val): 

70 return self.__to_arcsec(val) 

71 elif my_qa.getunit(val) in ('', 'pixel'): 

72 return my_qa.getvalue(val)[0] * cell_size_arcsec 

73 else: 

74 raise ValueError("Invalid width %s" % str(val)) 

75 

76 def set_antenna(self, diam, blockage="0.0m", taper=10): 

77 """Set parameters to construct antenna beam. 

78 

79 antenna diameter and blockage 

80 taper: the illumination taper in dB 

81 """ 

82 # try quantity 

83 my_qa = quanta() 

84 self.antenna_diam_m = my_qa.getvalue(my_qa.convert(diam, "m"))[0] 

85 self.antenna_block_m = my_qa.getvalue(my_qa.convert(blockage, "m"))[0] 

86 self.taper = taper 

87 self.is_antenna_set = True 

88 

89 def set_sampling(self, intervals, pa="0deg"): 

90 """Set sampling interval of observation. 

91 

92 intervals: pointing inteval of observation (['10arcsec','10arcsec']) 

93 pa: position angle (NOT USED) 

94 """ 

95 self.pa = pa 

96 self.sampling_arcsec = [abs(a) for a in 

97 self.__to_arcsec_list(intervals)] 

98 self.is_sampling_set = True 

99 

100 def set_image_param(self, cell, ref_freq, gridfunction, 

101 convsupport, truncate, gwidth, jwidth, is_alma=False): 

102 """Set imaging parameters. 

103 

104 cell: image pixel size 

105 ref_freq: reference frequency of image to calculate beam size 

106 gridfunction, convsupport, truncate, gwidth, jwidth: 

107 parameters passed to imager 

108 is_alma: valid only for PB kernel to use 10.7m 

109 """ 

110 self.ref_freq = ref_freq 

111 self.cell_arcsec = [abs(a) for a in 

112 self.__to_arcsec_list(cell)] 

113 if gridfunction.upper() == "SF": 

114 self.__set_sf_kernel(convsupport) 

115 elif gridfunction.upper() == "GJINC": 

116 self.__set_gjinc_kernel(truncate, gwidth, jwidth) 

117 elif gridfunction.upper() == "GAUSS": 

118 self.__set_gauss_kernel(truncate, gwidth) 

119 elif gridfunction.upper() == "BOX": 

120 self.__set_box_kernel(self.cell_arcsec[0]) 

121 elif gridfunction.upper() == "PB": 

122 self.__set_pb_kernel(is_alma) 

123 self.is_kernel_set = True 

124 

125 def __set_sf_kernel(self, convsupport): 

126 """Set SF kernel parameter to the class.""" 

127 self.kernel_type = "SF" 

128 self.kernel_param = dict(convsupport=convsupport) 

129 

130 def __set_gjinc_kernel(self, truncate, gwidth, jwidth): 

131 """Set GJINC kernel parameter to the class.""" 

132 self.kernel_type = "GJINC" 

133 self.kernel_param = dict(truncate=truncate, gwidth=gwidth, jwidth=jwidth) 

134 

135 def __set_gauss_kernel(self, truncate, gwidth): 

136 """Set GAUSS kernel parameter to the class.""" 

137 self.kernel_type = "GAUSS" 

138 self.kernel_param = dict(truncate=truncate, gwidth=gwidth) 

139 

140 def __set_box_kernel(self, width): 

141 """Set BOX kernel parameter to the class.""" 

142 self.kernel_type = "BOX" 

143 self.kernel_param = dict(width=width) 

144 

145 def __set_pb_kernel(self, alma=False): 

146 """Set PB kernel parameter to the class.""" 

147 self.kernel_type = "PB" 

148 self.kernel_param = dict(alma=alma) 

149 

150 def get_beamsize_image(self): 

151 """Return FWHM of theoretical beam size in image. 

152 

153 The FWHM is derived by fitting gridded beam with Gaussian function. 

154 """ 

155 # assert necessary information is set 

156 self.__assert_antenna() 

157 self.__assert_kernel() 

158 self.__assert_sampling() 

159 casalog.post("Calculating theoretical beam size of the image") 

160 # construct theoretic beam for image 

161 axis, beam = self.get_antenna_beam() 

162 casalog.post( 

163 f"Length of convolution array={len(axis)}, " 

164 f"total width={axis[-1] - axis[0]} arcsec, " 

165 f"separation={axis[1] - axis[0]} arcsec", 

166 priority="DEBUG1") 

167 kernel = self.get_kernel(axis) 

168 sampling = self.get_box_kernel(axis, self.sampling_arcsec[0]) 

169 # convolution 

170 gridded = np.convolve(beam, kernel, mode='same') 

171 gridded /= max(gridded) 

172 result = np.convolve(gridded, sampling, mode='same') 

173 result /= max(result) 

174 # fwhm_arcsec = findFWHM(axis,result) 

175 fwhm_arcsec, dummy = self.gaussfit(axis, result, minlevel=0.0, truncate=False) 

176 # DEBUG MESSAGE 

177 casalog.post("- initial FWHM of beam = %f arcsec" % 

178 findFWHM(axis, beam)) 

179 casalog.post("- FWHM of gridding kernel = %f arcsec" % 

180 findFWHM(axis, kernel)) 

181 casalog.post("- FWHM of theoretical beam = %f arcsec" % 

182 findFWHM(axis, result)) 

183 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" % 

184 fwhm_arcsec) 

185 ### 

186 del result 

187 if len(self.sampling_arcsec) == 1 and \ 

188 (len(self.cell_arcsec) == 1 or self.kernel_type != "BOX"): 

189 fwhm_geo_arcsec = fwhm_arcsec 

190 else: 

191 if len(self.sampling_arcsec) > 1: 

192 sampling = self.get_box_kernel(axis, self.sampling_arcsec[1]) 

193 elif self.kernel_type == "BOX" and len(self.cell_arcsec) > 1: 

194 kernel = self.get_box_kernel(axis, self.cell_arcsec[1]) 

195 gridded = np.convolve(beam, kernel, mode='same') 

196 gridded /= max(gridded) 

197 result = np.convolve(gridded, sampling, mode='same') 

198 result /= max(result) 

199 # fwhm1 = findFWHM(axis,result) 

200 fwhm1, _ = self.gaussfit(axis, result, minlevel=0.0, truncate=False) 

201 fwhm_geo_arcsec = np.sqrt(fwhm_arcsec * fwhm1) 

202 # DEBUG MESSAGE 

203 casalog.post("The second axis") 

204 casalog.post("- FWHM of gridding kernel = %f arcsec" % 

205 findFWHM(axis, kernel)) 

206 casalog.post("- FWHM of theoretical beam = %f arcsec" % 

207 findFWHM(axis, result)) 

208 casalog.post("- FWHM of theoretical beam (gauss fit) = %f arcsec" % 

209 fwhm1) 

210 del result 

211 # clear-up axes 

212 del axis, beam, kernel, sampling, gridded 

213 

214 return dict(major="%farcsec" % (fwhm_geo_arcsec), 

215 minor="%farcsec" % (fwhm_geo_arcsec), 

216 pa=self.pa) 

217 

218 def __assert_antenna(self): 

219 """Raise an error if antenna information is not set.""" 

220 if not self.is_antenna_set: 

221 raise RuntimeError("Antenna is not set") 

222 

223 def __assert_kernel(self): 

224 """Raise an error if imaging parameters are not set.""" 

225 if not self.is_kernel_set: 

226 raise RuntimeError("Kernel is not set.") 

227 

228 def __assert_sampling(self): 

229 """Raise an error if sampling information is not set.""" 

230 if not self.is_sampling_set: 

231 raise RuntimeError("Sampling information is not set") 

232 

233 def get_antenna_beam(self): 

234 """Return arrays of antenna beam response and it's horizontal axis. 

235 

236 Picked from AnalysisUtils (revision 1.2204, 2015/02/18) 

237 """ 

238 self.__assert_antenna() 

239 self.__assert_kernel() 

240 

241 fwhm_arcsec = primaryBeamArcsec(frequency=self.ref_freq, 

242 diameter=self.antenna_diam_m, 

243 taper=self.taper, showEquation=True, 

244 obscuration=self.antenna_block_m, 

245 fwhmfactor=None) 

246 truncate = False 

247 convolutionPixelSize = 0.02 # arcsec 

248 # avoid too coarse convolution array w.r.t. sampling 

249 if self.is_sampling_set and \ 

250 min(self.sampling_arcsec) < 5 * convolutionPixelSize: 

251 convolutionPixelSize = min(self.sampling_arcsec) / 5.0 

252 # avoid too fine convolution arrays. 

253 sizes = list(self.cell_arcsec) + [fwhm_arcsec] 

254 if self.is_sampling_set: 

255 sizes += list(self.sampling_arcsec) 

256 minsize = min(sizes) 

257 support_min = 1000. 

258 support_fwhm = 5000. 

259 if minsize > support_min * convolutionPixelSize: 

260 convolutionPixelSize = minsize / support_min 

261 if fwhm_arcsec > support_fwhm * convolutionPixelSize: 

262 convolutionPixelSize = min(fwhm_arcsec / support_fwhm, minsize / 10.) 

263 

264 if (self.taper < 0.1): 

265 # Airly disk 

266 return self.buildAiryDisk(fwhm_arcsec, 3., convolutionPixelSize, 

267 truncate=truncate, 

268 obscuration=self.antenna_block_m, 

269 diameter=self.antenna_diam_m) 

270 else: 

271 # Gaussian beam 

272 myxaxis = np.arange(-3 * fwhm_arcsec, 

273 3 * fwhm_arcsec + 0.5 * convolutionPixelSize, 

274 convolutionPixelSize) 

275 myfunction = self.gauss(myxaxis, [fwhm_arcsec, truncate]) 

276 return myxaxis, myfunction 

277 

278 def get_kernel(self, axis): 

279 """Return imaging kernel array.""" 

280 self.__assert_kernel() 

281 if self.kernel_type == "SF": 

282 # TODO: what to do for cell[0]!=cell[1]??? 

283 return self.get_sf_kernel(axis, self.kernel_param['convsupport'], 

284 self.cell_arcsec[0]) 

285 elif self.kernel_type == "GJINC": 

286 return self.get_gjinc_kernel(axis, self.kernel_param['truncate'], 

287 self.kernel_param['gwidth'], 

288 self.kernel_param['jwidth'], 

289 self.cell_arcsec[0]) 

290 elif self.kernel_type == "GAUSS": 

291 return self.get_gauss_kernel(axis, self.kernel_param['truncate'], 

292 self.kernel_param['gwidth'], 

293 self.cell_arcsec[0]) 

294 elif self.kernel_type == "BOX": 

295 return self.get_box_kernel(axis, self.kernel_param['width']) 

296 elif self.kernel_type == "PB": 

297 diam = self.antenna_diam_m 

298 if self.kernel_param['alma']: 

299 diam = 10.7 

300 casalog.post( 

301 f"Using effective antenna diameter {diam}m for " 

302 f"{self.kernel_type} kernel of ALMA antennas") 

303 epsilon = self.antenna_block_m / diam 

304 return self.get_pb_kernel(axis, diam, self.ref_freq, epsilon=epsilon) 

305 # return (self.rootAiryIntensity(axis, epsilon))**2 

306 else: 

307 raise RuntimeError("Invalid kernel: %s" % self.kernel_type) 

308 

309 def summary(self): 

310 """Print summary of parameters set to the class.""" 

311 casalog.post("=" * 40) 

312 casalog.post("Summary of Image Beam Parameters") 

313 casalog.post("=" * 40) 

314 casalog.post("[Antenna]") 

315 self.__antenna_summary() 

316 

317 casalog.post("\n[Imaging Parameters]") 

318 self.__kernel_summary() 

319 

320 casalog.post("\n[Sampling]") 

321 self.__sampling_summary() 

322 

323 def __notset_message(self): 

324 casalog.post("Not set.") 

325 

326 def __antenna_summary(self): 

327 """Print summary of antenna setup.""" 

328 if not self.is_antenna_set: 

329 self.__notset_message() 

330 return 

331 casalog.post("diameter: %f m" % (self.antenna_diam_m)) 

332 casalog.post("blockage: %f m" % (self.antenna_block_m)) 

333 

334 def __kernel_summary(self): 

335 """Print summary of imaging parameter setup.""" 

336 if not self.is_kernel_set: 

337 self.__notset_message() 

338 return 

339 casalog.post("reference frequency: %s" % str(self.ref_freq)) 

340 casalog.post("cell size: %s arcsec" % str(self.cell_arcsec)) 

341 casalog.post("kernel type: %s" % self.kernel_type) 

342 for key, val in self.kernel_param.items(): 

343 casalog.post("%s: %s" % (key, str(val))) 

344 

345 def __sampling_summary(self): 

346 """Print summary of sampling setup.""" 

347 if not self.is_sampling_set: 

348 self.__notset_message() 

349 return 

350 casalog.post("sampling interval: %s arcsec" % str(self.sampling_arcsec)) 

351 casalog.post("position angle: %s" % (self.pa)) 

352 

353 # 

354 # Construct Kernel arrays 

355 # 

356 

357 # 

358 # BOX 

359 # 

360 def get_box_kernel(self, axis, width): 

361 """Return a box kernel array with specified width. 

362 

363 axis: an array of xaxis values 

364 width: kernel width 

365 output array 

366 out[i] = 1.0 (-width/2.0 <= x[i] <= width/2.0) 

367 = 0.0 (else) 

368 """ 

369 data = np.zeros(len(axis)) 

370 indices = np.where(abs(axis) <= width / 2.0) 

371 data[indices] = 1.0 

372 return data 

373 

374 # 

375 # SF 

376 # 

377 def get_sf_kernel(self, axis, convsupport, cell_size): 

378 """Return spheroidal kernel array. 

379 

380 axis: an array of xaxis values 

381 convsupport: the truncation radius of SF kernel in pixel unit. 

382 cell_size: image pixel size 

383 

384 Modified version of one in AnalysisUtils.sfBeamPredict (revision 1.2204, 2015/02/18) 

385 """ 

386 convsupport = 3 if convsupport == -1 else convsupport 

387 supportwidth = (convsupport * 1.0 + 0.0) 

388 # value obtained by matching Fred's grdsf.f output with scipy(m=0,n=0) 

389 c = 5.356 * np.pi / 2.0 

390 sfaxis = axis / float(supportwidth * cell_size * 1.0) 

391 indices = np.where(abs(sfaxis) < 1)[0] 

392 centralRegion = sfaxis[indices] 

393 centralRegionY = self.spheroidalWaveFunction(centralRegion, 0, 0, c, 1) 

394 mysf = np.zeros(len(axis)) 

395 mysf[indices] += centralRegionY / max(centralRegionY) 

396 return mysf 

397 

398 def spheroidalWaveFunction(self, x, m=0, n=0, c=0, alpha=0): 

399 """Return spheroidal wave function. 

400 

401 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

402 """ 

403 if (type(x) != list and type(x) != np.ndarray): 

404 returnScalar = True 

405 x = [x] 

406 else: 

407 returnScalar = False 

408 cv = scipy.special.pro_cv(m, n, c) # get the eigenvalue 

409 result = scipy.special.pro_ang1_cv(m, n, c, cv, x)[0] 

410 for i in range(len(x)): 

411 nu = x[i] # (i-0.5*len(x))/(0.5*len(x)) # only true if x is symmetric about zero 

412 result[i] *= (1 - nu**2)**alpha 

413 # The peak of this function is about 10000 for m=0,n=0,c=6 

414 if (returnScalar): 

415 return result[0] 

416 else: 

417 return result 

418 

419 # 

420 # GAUSS 

421 # 

422 def get_gauss_kernel(self, axis, truncate, gwidth, cell_arcsec): 

423 """Return a gaussian kernel array. 

424 

425 axis : an array of xaxis values 

426 truncate : truncation radius 

427 NOTE definition is different from truncate in gauss()! 

428 gwidth : kernel gwidth 

429 cell_arcsec : image pixel size in unit of arcsec 

430 """ 

431 if gwidth == -1: 

432 gwidth = np.sqrt(np.log(2.0)) 

433 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec) 

434 # get gauss for full axis 

435 result = self.gauss(axis, [gwidth_arcsec]) 

436 # truncate kernel outside the truncation radius 

437 if truncate == -1: 

438 trunc_arcsec = gwidth_arcsec * 1.5 

439 elif truncate is not None: 

440 trunc_arcsec = self.__parse_width(truncate, cell_arcsec) 

441 idx = np.where(abs(axis) > trunc_arcsec) 

442 result[idx] = 0. 

443 return result 

444 

445 def gauss(self, x, parameters): 

446 """Compute the value of the Gaussian function. 

447 

448 Compute the value of the Gaussian function at the specified 

449 location(s) with respect to the peak (which is assumed to be at x=0). 

450 truncate: if not None, then set result to zero if below this value. 

451 -Todd Hunter 

452 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

453 """ 

454 if (type(parameters) != np.ndarray and type(parameters) != list): 

455 parameters = np.array([parameters]) 

456 if (len(parameters) < 2): 

457 parameters = np.array([parameters[0], 0]) 

458 fwhm = parameters[0] 

459 x = np.asarray(x, dtype=np.float64) 

460 sigma = fwhm / 2.3548201 

461 result = np.exp(-(x**2 / (2.0 * sigma**2))) 

462 idx = np.where(result < parameters[1])[0] 

463 result[idx] = 0 

464 return result 

465 

466 # 

467 # GJINC 

468 # 

469 def get_gjinc_kernel(self, axis, truncate, gwidth, jwidth, cell_arcsec): 

470 """Return a GJinc kernel array. 

471 

472 axis : an array of xaxis values 

473 truncate : truncation radius (NOT SUPPORTED YET) 

474 gwidth jwidth : kernel gwidth 

475 cell_arcsec : image pixel size in unit of arcsec 

476 """ 

477 if gwidth == -1: 

478 gwidth = 2.52 * np.sqrt(np.log(2.0)) 

479 if jwidth == -1: 

480 jwidth = 1.55 

481 gwidth_arcsec = self.__parse_width(gwidth, cell_arcsec) 

482 jwidth_arcsec = self.__parse_width(jwidth, cell_arcsec) 

483 mygjinc = self.trunc(self.gjinc(axis, gwidth=gwidth_arcsec, 

484 jwidth=jwidth_arcsec, 

485 useCasaJinc=True, normalize=False)) 

486 return mygjinc 

487 

488 def gjinc(self, x, gwidth, jwidth, useCasaJinc=False, normalize=False): 

489 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

490 if (useCasaJinc): 

491 result = self.grdjinc1(x, jwidth, normalize) * self.gjincGauss(x, gwidth) 

492 else: 

493 result = self.jinc(x, jwidth) * self.gjincGauss(x, gwidth) 

494 return result 

495 

496 def grdjinc1(self, val, c, normalize=True): 

497 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

498 # Casa's function 

499 # // Calculate J_1(x) using approximate formula 

500 xs = np.pi * val / c 

501 result = [] 

502 for x in xs: 

503 x = abs(x) # I added this to make it symmetric 

504 ax = abs(x) 

505 if (ax < 8.0): 

506 y = x * x 

507 ans1 = x * (72362614232.0 + y * 

508 (-7895059235.0 + y * (242396853.1 + y * 

509 (-2972611.439 + y * (15704.48260 + y * (-30.16036606)))))) 

510 ans2 = 144725228442.0 + y * (2300535178.0 + y * 

511 (18583304.74 + y * (99447.43394 + y * 

512 (376.9991397 + y * 1.0)))) 

513 ans = ans1 / ans2 

514 else: 

515 z = 8.0 / ax 

516 y = z * z 

517 xx = ax - 2.356194491 

518 ans1 = 1.0 + y * (0.183105e-2 + y * 

519 (-0.3516396496e-4 + y * 

520 (0.2457520174e-5 + y * (-0.240337019e-6)))) 

521 ans2 = 0.04687499995 + y * (-0.2002690873e-3 + y * 

522 (0.8449199096e-5 + y * 

523 (-0.88228987e-6 + y * (0.105787412e-6)))) 

524 ans = sqrt(0.636619772 / ax) * (cos(xx) * ans1 - z * sin(xx) * ans2) 

525 if (x < 0.0): 

526 ans = -ans 

527 if (x == 0.0): 

528 out = 0.5 

529 else: 

530 out = ans / x 

531 if (normalize): 

532 out = out / 0.5 

533 result.append(out) 

534 return(result) 

535 

536 def jinc(self, x, jwidth): 

537 """Compute JINC value. 

538 

539 The peak of this function is 0.5. 

540 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

541 """ 

542 argument = np.pi * np.abs(x) / jwidth 

543 np.seterr(invalid='ignore') # prevent warning for central point 

544 result = scipy.special.j1(argument) / argument 

545 np.seterr(invalid='warn') 

546 for i in range(len(x)): 

547 if (abs(x[i]) < 1e-8): 

548 result[i] = 0.5 

549 return result 

550 

551 def gjincGauss(self, x, gwidth): 

552 return (np.exp(-np.log(2) * (x / float(gwidth))**2)) 

553 

554 # 

555 # Airly disk 

556 # 

557 def get_pb_kernel(self, axis, diam, ref_freq, epsilon=0.0): 

558 """Return Airy Disk array. 

559 

560 Return Airy Disk array defined by the axis, diameter, reference frequency 

561 and ratio of central hole and antenna diameter 

562 

563 axis: x-axis values 

564 diameter: antenna diameter in unit of m 

565 reference frequency: the reference frequency of the image 

566 epsilon: ratio of central hole diameter to antenna diameter 

567 """ 

568 a = (self.rootAiryIntensity(axis, epsilon))**2 

569 airyfwhm = findFWHM(axis, a) 

570 fwhm = primaryBeamArcsec(frequency=ref_freq, diameter=diam, 

571 taper=self.taper, showEquation=False, 

572 obscuration=diam * epsilon, 

573 fwhmfactor=None) 

574 ratio = fwhm / airyfwhm 

575 tempaxis = axis / ratio 

576 a = self.rootAiryIntensity(tempaxis, epsilon) 

577 return a**2 

578 

579 def buildAiryDisk(self, fwhm, xaxisLimitInUnitsOfFwhm, convolutionPixelSize, 

580 truncate=False, obscuration=0.75, diameter=12.0): 

581 """Build airy disk. 

582 

583 This function computes the Airy disk (with peak of 1.0) across a grid of points 

584 specified in units of the FWHM of the disk. 

585 fwhm: a value in arcsec 

586 xaxisLimitInUnitsOfFwhm: an integer or floating point unitless value 

587 truncate: if True, truncate the function at the first null (on both sides) 

588 obscuration: central hole diameter (meters) 

589 diameter: dish diameter (meters) 

590 obscuration and diameter are used to compute the blockage ratio (epsilon) 

591 and its effect on the pattern 

592 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

593 """ 

594 epsilon = obscuration / diameter 

595 # casalog.post("Using epsilon = %f" % (epsilon)) 

596 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm, 

597 xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize, 

598 convolutionPixelSize) 

599 a = (self.rootAiryIntensity(myxaxis, epsilon))**2 

600 # Scale the Airy disk to the desired FWHM, and recompute on finer grid 

601 airyfwhm = findFWHM(myxaxis, a) 

602 ratio = fwhm / airyfwhm 

603 myxaxis = np.arange(-xaxisLimitInUnitsOfFwhm * fwhm / ratio, 

604 (xaxisLimitInUnitsOfFwhm * fwhm + 0.5 * convolutionPixelSize) / ratio, 

605 convolutionPixelSize / ratio) 

606 a = self.rootAiryIntensity(myxaxis, epsilon) 

607 if (truncate): 

608 a = self.trunc(a) 

609 a = a**2 

610 myxaxis *= ratio 

611 return(myxaxis, a) 

612 

613 def rootAiryIntensity(self, myxaxis, epsilon=0.0, showplot=False): 

614 """Compute the value of 2*J1(x)/x. 

615 

616 This function computes 2*J1(x)/x, which can be squared to get an Airy disk. 

617 myxaxis: the x-axis values to use 

618 epsilon: radius of central hole in units of the dish diameter 

619 Migrated from AnalysisUtils.py (revision 1.2204, 2015/02/18) 

620 """ 

621 if (epsilon > 0): 

622 a = (2 * spspec.j1(myxaxis) / myxaxis - 

623 epsilon**2 * 2 * spspec.j1(myxaxis * epsilon) / 

624 (epsilon * myxaxis)) / (1 - epsilon**2) 

625 else: 

626 a = 2 * spspec.j1(myxaxis) / myxaxis # simpler formula for epsilon=0 

627 return(a) 

628 

629 def trunc(self, result): 

630 """Truncate a list at the first null. 

631 

632 Truncates a list at the first null on both sides of the center, 

633 starting at the center and moving outward in each direction. 

634 Assumes the list is positive in the center, e.g. a Gaussian beam. 

635 -Todd Hunter 

636 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

637 """ 

638 # casa default truncate=-1, which means truncate at radius of first null 

639 mask = np.zeros(len(result)) 

640 truncateBefore = 0 

641 truncateBeyond = len(mask) 

642 for r in range(len(result) // 2, len(result)): 

643 if (result[r] < 0): 

644 truncateBeyond = r 

645 break 

646 for r in range(len(result) // 2, 0, -1): 

647 if (result[r] < 0): 

648 truncateBefore = r 

649 break 

650 mask[truncateBefore:truncateBeyond] = 1 

651 # casalog.post( 

652 # "Truncating outside of pixels %d-%d (len=%d)" % 

653 # (truncateBefore,truncateBeyond-1,len(mask))) 

654 result *= mask 

655 return result 

656 

657 def gaussfit_errfunc(self, parameters, x, y): 

658 """Migrated from AnalysisUtils (revision 1.2204, 2015/02/18).""" 

659 return (y - self.gauss(x, parameters)) 

660 

661 def gaussfit(self, x, y, showplot=False, minlevel=0, verbose=False, 

662 title=None, truncate=False): 

663 """Perform 1D Gaussian fit. 

664 

665 Fits a 1D Gaussian assumed to be centered at x=0 with amp=1 to the 

666 specified data, with an option to truncate it at some level. 

667 Returns the FWHM and truncation point. 

668 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

669 """ 

670 fwhm_guess = findFWHM(x, y) 

671 if (truncate == False): 

672 parameters = np.asarray([fwhm_guess], dtype=np.float64) 

673 else: 

674 parameters = np.asarray([fwhm_guess, truncate], dtype=np.float64) 

675 if (verbose): 

676 casalog.post("Fitting for %d parameters: guesses = %s" % (len(parameters), parameters)) 

677 xx = np.asarray(x, dtype=np.float64) 

678 yy = np.asarray(y, dtype=np.float64) 

679 lenx = len(x) 

680 if (minlevel > 0): 

681 xwidth = findFWHM(x, y, minlevel) 

682 xx = x[np.where(np.abs(x) < xwidth * 0.5)[0]] 

683 yy = y[np.where(np.abs(x) < xwidth * 0.5)[0]] 

684 if (verbose): 

685 casalog.postt("Keeping %d/%d points, guess = %f arcsec" % 

686 (len(x), lenx, fwhm_guess)) 

687 result = optimize.leastsq(self.gaussfit_errfunc, parameters, args=(xx, yy), 

688 full_output=1) 

689 bestParameters = result[0] 

690 infodict = result[2] 

691 numberFunctionCalls = infodict['nfev'] 

692 mesg = result[3] 

693 ier = result[4] 

694 if (verbose): 

695 casalog.post("optimize.leastsq: ier=%d, #calls=%d, message = %s" % 

696 (ier, numberFunctionCalls, mesg)) 

697 if (type(bestParameters) == list or type(bestParameters) == np.ndarray): 

698 fwhm = bestParameters[0] 

699 if verbose: 

700 casalog.post("fitted FWHM = %f" % (fwhm)) 

701 if (truncate != False): 

702 truncate = bestParameters[1] 

703 casalog.post("optimized truncation = %f" % (truncate)) 

704 else: 

705 fwhm = bestParameters 

706 return(fwhm, truncate) 

707 

708 

709def findFWHM(x, y, level=0.5, s=0): 

710 """Measure the FWHM of the specified profile. 

711 

712 This works well in a noise-free environment. 

713 The data are assumed to be sorted by the x variable. 

714 x: the position variable 

715 y: the intensity variable 

716 level: the signal level for which to find the full width 

717 s: see help scipy.interpolate.UnivariateSpline 

718 -Todd Hunter 

719 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

720 """ 

721 halfmax = np.max(y) * level 

722 spline = scipy.interpolate.UnivariateSpline(x, y - halfmax, s=s) 

723 result = spline.roots() 

724 if (len(result) == 2): 

725 x0, x1 = result 

726 return(abs(x1 - x0)) 

727 elif (len(result) == 1): 

728 return(2 * abs(result[0])) 

729 else: 

730 # modified (KS 2015/02/19) 

731 # casalog.post( 

732 # f"More than two crossings ({len(result)}), " 

733 # "fitting slope to points near that power level." 

734 # ) 

735 # result = 2*findZeroCrossingBySlope(x, y-halfmax) 

736 # return(result) 

737 errmsg = "Unsupported FWHM search in CASA. " + \ 

738 f"More than two corssings ({len(result)}) at level {halfmax} ({level} % of peak)." 

739 raise Exception(errmsg) 

740 

741 

742def primaryBeamArcsec(frequency, diameter, obscuration, taper, 

743 showEquation=True, use2007formula=True, fwhmfactor=None): 

744 """Implement the Baars formula: b*lambda / D. 

745 

746 if use2007formula==False, use the formula from ALMA Memo 456 

747 if use2007formula==True, use the formula from Baars 2007 book 

748 (see au.baarsTaperFactor) 

749 In either case, the taper value is expected to be entered as positive. 

750 Note: if a negative value is entered, it is converted to positive. 

751 The effect of the central obstruction on the pattern is also accounted for 

752 by using a spline fit to Table 10.1 of Schroeder's Astronomical Optics. 

753 fwhmfactor: if given, then ignore the taper 

754 For further help and examples, see https://safe.nrao.edu/wiki/bin/view/ALMA/PrimaryBeamArcsec 

755 -- Todd Hunter 

756 Simplified version of the one in AnalysisUtils (revision 1.2204, 2015/02/18) 

757 """ 

758 if (fwhmfactor is not None): 

759 taper = effectiveTaper(fwhmfactor, diameter, obscuration, use2007formula) 

760 if (taper is None): 

761 return 

762 if (taper < 0): 

763 taper = abs(taper) 

764 if (obscuration > 0.4 * diameter): 

765 casalog.post( 

766 "This central obscuration is too large for the method of calculation employed here.") 

767 return 

768 if (type(frequency) == str): 

769 my_qa = quanta() 

770 frequency = my_qa.getvalue(my_qa.convert(frequency, "Hz"))[0] 

771 lambdaMeters = 2.99792458e8 / frequency 

772 b = baarsTaperFactor(taper, use2007formula) * centralObstructionFactor(diameter, obscuration) 

773 if (showEquation): 

774 if (use2007formula): 

775 formula = "Baars (2007) Eq 4.13" 

776 else: 

777 formula = "ALMA memo 456 Eq. 18" 

778 casalog.post( 

779 f"Coefficient from {formula} for a -{taper:.1f}dB edge taper " 

780 f"and obscuration ratio={obscuration:g}/{diameter:g} = {b:.3f}*lambda/D") 

781 return(b * lambdaMeters * 3600 * 180 / (diameter * np.pi)) 

782 

783 

784def effectiveTaper(fwhmFactor=1.16, diameter=12, obscuration=0.75, 

785 use2007formula=True): 

786 """Compute effective taper from constant factor for illumination pattern. 

787 

788 The inverse of (Baars formula multiplied by the central 

789 obstruction factor). Converts an observed value of the constant X in 

790 the formula FWHM=X*lambda/D into a taper in dB (positive value). 

791 if use2007formula == False, use Equation 18 from ALMA Memo 456 

792 if use2007formula == True, use Equation 4.13 from Baars 2007 book 

793 -- Todd Hunter 

794 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

795 """ 

796 cOF = centralObstructionFactor(diameter, obscuration) 

797 if (fwhmFactor < 1.02 or fwhmFactor > 1.22): 

798 casalog.post("Invalid fwhmFactor (1.02<fwhmFactor<1.22)") 

799 return 

800 if (baarsTaperFactor(10, use2007formula) * cOF < fwhmFactor): 

801 increment = 0.01 

802 for taper_dB in np.arange(10, 10 + increment * 1000, increment): 

803 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor > 0): 

804 break 

805 else: 

806 increment = -0.01 

807 for taper_dB in np.arange(10, 10 + increment * 1000, increment): 

808 if (baarsTaperFactor(taper_dB, use2007formula) * cOF - fwhmFactor < 0): 

809 break 

810 return(taper_dB) 

811 

812 

813def baarsTaperFactor(taper_dB, use2007formula=True): 

814 """Convert a taper to the constant factor for illumination pattern. 

815 

816 Converts a taper in dB to the constant X 

817 in the formula FWHM=X*lambda/D for the parabolic illumination pattern. 

818 We assume that taper_dB comes in as a positive value. 

819 use2007formula: False --> use Equation 18 from ALMA Memo 456. 

820 True --> use Equation 4.13 from Baars 2007 book 

821 - Todd Hunter 

822 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

823 """ 

824 tau = 10**(-0.05 * taper_dB) 

825 if (use2007formula): 

826 return(1.269 - 0.566 * tau + 0.534 * (tau**2) - 0.208 * (tau**3)) 

827 else: 

828 return(1.243 - 0.343 * tau + 0.12 * (tau**2)) 

829 

830 

831def centralObstructionFactor(diameter=12.0, obscuration=0.75): 

832 """Compute the scale factor of an Airy pattern. 

833 

834 Computes the scale factor of an Airy pattern as a function of the 

835 central obscuration, using Table 10.1 of Schroeder's 'Astronomical Optics'. 

836 -- Todd Hunter 

837 Migrated from AnalysisUtils (revision 1.2204, 2015/02/18) 

838 """ 

839 epsilon = obscuration / diameter 

840 myspline = scipy.interpolate.UnivariateSpline( 

841 [0, 0.1, 0.2, 0.33, 0.4], [1.22, 1.205, 1.167, 1.098, 1.058], s=0) 

842 factor = myspline(epsilon) / 1.22 

843 if (type(factor) == np.float64): 

844 # casapy 4.2 

845 return(factor) 

846 else: 

847 # casapy 4.1 and earlier 

848 return(factor[0])