Source code for orca.utils.calibrationutils

import numpy as np
from os import path
import math, shutil
from datetime import datetime
from astropy.time import Time
from astropy.coordinates import SkyCoord, ICRS
from orca.utils.coordutils import CYG_A, OVRO_LWA_LOCATION, is_visible, get_altaz_at_ovro
from orca.utils.beam import beam
from casacore import tables
from casacore import measures
from casatools import componentlist


[docs]def calibration_time_range(utc_times_txt_path: str, start_time: datetime, end_time: datetime, duration_min: float = 20): """Get dada file names based on Cygnus A transit for calibration. Get list of .dada file names to use for calibration. Selects .dada files that span {duration_min} centered on transit of Cygnus A. Args: utc_times_txt_path: Path to utc_times.txt file. start_time: Start time of data for which to derive calibration tables, datetime format. end_time: End time of data for which to derive calibration tables, datetime format. duration_min: In minutes, amount of time used for calibration. Default is 20 min. Returns: cal_start_time and cal_end_time covering {duration_min} calibration range, in datetime format. """ # If the utc_times.txt file contains multiple transits of Cygnus A, will select the # first transit. # Get array of UTC vals from {utc_times_txt_path}. utctimes, dadafiles = np.genfromtxt(utc_times_txt_path, delimiter=' \t ', \ dtype='str', unpack=True) # Convert {utctimes} to array of LSTs and MJDs lsttimes = Time(utctimes, scale='utc').sidereal_time('apparent',OVRO_LWA_LOCATION.lon) mjdtimes = Time(utctimes, scale='utc').mjd # Select {duration_min} range of LSTs where CygA is closest to zenith CygA_HA = CYG_A.ra.hourangle CygA_HA_start = CygA_HA - duration_min/2./60. CygA_HA_stop = CygA_HA + duration_min/2./60. centermjd = Time(datetime.fromtimestamp( (start_time.timestamp() + end_time.timestamp()) / 2. )).mjd rel_starttimes = np.abs(lsttimes.value - CygA_HA_start) + np.abs(mjdtimes - centermjd) rel_stoptimes = np.abs(lsttimes.value - CygA_HA_stop) + np.abs(mjdtimes - centermjd) cyga_start_ind = list(rel_starttimes).index(min(rel_starttimes)) cyga_stop_ind = list(rel_stoptimes).index(min(rel_stoptimes)) # cal_start_time = Time(utctimes[cyga_start_ind-1]).to_datetime() cal_end_time = Time(utctimes[cyga_stop_ind+1]).to_datetime() # return cal_start_time, cal_end_time
[docs]def gen_model_ms_stokes(ms: str, zest: bool = False): """ Generate component lists for calibration / polarized peeling in CASA. Currently only includes Cas A & Cyg A. Args: ms: Measurement set to generate model for. zest: For supplying component lists for polarized peeling. Default is False. Returns: Returns path to component list(s). If zest=True, will return a list of paths to single-source component lists. """ src_list = [{'label': 'CasA', 'flux': 16530, 'alpha': -0.72, 'ref_freq': 80.0, 'position': 'J2000 23h23m24s +58d48m54s'}, {'label': 'CygA', 'flux': 16300, 'alpha': -0.58, 'ref_freq': 80.0, 'position': 'J2000 19h59m28.35663s +40d44m02.0970s'}] with tables.table(ms, ack=False) as t: t0 = t.getcell('TIME', 0) me = measures.measures() time = me.epoch('UTC', '%fs' % t0) timeT = Time(time['m0']['value'], format='mjd', scale='utc') timeT.format = 'datetime' utctime = timeT.value lsttime = timeT.sidereal_time('apparent', OVRO_LWA_LOCATION.lon).value freq = float(tables.table(ms+'/SPECTRAL_WINDOW', ack=False).getcell('NAME', 0))/1.e6 # outbeam = beam(ms) cal_srcs = [] for s,src in enumerate(src_list): src_position: str = src.get('position') # type: ignore ra = src_position.split(' ')[1] dec = src_position.split(' ')[2] if is_visible(SkyCoord(ra, dec, frame=ICRS), utctime) and \ (not zest or (zest and (lsttime < 8 or lsttime > 13))): altaz = get_altaz_at_ovro(SkyCoord(ra, dec, frame=ICRS), utctime) scale = np.array(outbeam.srcIQUV(altaz.az.deg, altaz.alt.deg)) src_list[s]['Stokes'] = list(_flux80_47(src['flux'], src['alpha'], freq, src['ref_freq']) * scale) cal_srcs.append(src_list[s]) if not cal_srcs: return cl = componentlist() if zest: fluxIvals = [src.get('Stokes')[0] for src in cal_srcs] # type: ignore sortedinds = np.argsort(fluxIvals)[::-1] cllist = [] counter = 0 for ind in sortedinds: src = cal_srcs[ind] clname = '%s_%d.cl' % (path.splitext(path.abspath(ms))[0],counter) try: shutil.rmtree(clname) except OSError: pass cl.done() cl.addcomponent(flux=src['Stokes'], polarization='Stokes', dir=src['position'], label=src['label']) cl.setstokesspectrum(which=0, type='spectral index', index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq) cl.rename(clname) cl.done() cllist.append(clname) counter+=1 return cllist else: cl.done() clname = '%s.cl' % path.splitext(path.abspath(ms))[0] try: shutil.rmtree(clname) except OSError: pass for s,src in enumerate(cal_srcs): cl.addcomponent(flux=src['Stokes'], polarization='Stokes', dir=src['position'], label=src['label']) cl.setstokesspectrum(which=s, type='spectral index', index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq) cl.rename(clname) cl.done() return clname
[docs]def gen_model_from_dict(ms: str, npzfile: str): """ """ Ateamdict = np.load(npzfile) sourcename_list = Ateamdict['sourcename'] source_Vflux_list = Ateamdict['pkflux'] source_ra_list = Ateamdict['ra_pos'] source_dec_list = Ateamdict['dec_pos'] freq_val = Ateamdict['frqval'] src_list = [{'label': 'VirA', 'flux': '2400', 'alpha': -0.86, 'ref_freq': 80.0, 'position': 'J2000 12h30m49.42338s +12d23m28.0439s'}, {'label': 'TauA', 'flux': '1770', 'alpha': -0.27, 'ref_freq': 80.0, 'position': 'J2000 05h34m31.94s +22d00m52.2s'}, {'label': 'CasA', 'flux': 16530, 'alpha': -0.72, 'ref_freq': 80.0, 'position': 'J2000 23h23m24s +58d48m54s'}] freq = freq_val/1.e6 sub_srcs = [] for s,src in enumerate(src_list): if not np.isnan(source_Vflux_list[s]) and not np.isnan(source_ra_list[s]) and not np.isnan(source_dec_list[s]): src_list[s]['Stokes'] = [0, 0, 0, -source_Vflux_list[s]] new_pos = SkyCoord(source_ra_list[s], source_dec_list[s], frame='icrs', unit='deg') src_list[s]['position'] = f'J2000 {new_pos.to_string("hmsdms")}' sub_srcs.append(src_list[s]) if not sub_srcs: return cl = componentlist() cl.done() clname = '%s.cl' % path.splitext(path.abspath(ms))[0] try: shutil.rmtree(clname) except OSError: pass for s,src in enumerate(sub_srcs): cl.addcomponent(flux=src['Stokes'], polarization='Stokes', dir=src['position'], label=src['label']) cl.setstokesspectrum(which=s, type='spectral index', index=[src['alpha'], 0, 0, 0], reffreq='%sMHz' % freq) cl.rename(clname) cl.done() return clname
def _flux80_47(flux_hi, sp, output_freq, ref_freq): # given a flux at 80 MHz and a sp_index, # return the flux at MS center-frequency. return flux_hi * 10 ** (sp * math.log(output_freq/ref_freq, 10))