"""Peeling related transforms
"""
import logging
from orca.utils.sourcemodels import RFI_B, CYG_A_UNPOLARIZED_RESOLVED, CAS_A_UNPOLARIZED_RESOLVED
from orca.wrapper import ttcal
from typing import List, Optional
from datetime import datetime
import tempfile
from os import path
import json
import numpy as np
import shutil
import casacore.tables as tables
from orca.utils import coordutils
from orca.utils.calibrationutils import gen_model_ms_stokes
from casatasks import ft, bandpass, polcal, applycal
log = logging.getLogger(__name__)
CORRECTED_DATA = 'CORRECTED_DATA'
DATA = 'DATA'
[docs]def ttcal_peel_from_data_to_corrected_data(ms: str, utc_time: datetime, include_rfi_source: bool = True) -> str:
""" Use TTCal to peel. Read from DATA column and write to CORRECTED_DATA
If the CORRECTED_DATA column exists, it does not do anything.
Args:
ms: Path to measurement set.
utc_time: datetime object to figure out what sources are up.
include_rfi_source: Include near-field generic RFI sources in peel.
Returns: The output measurement set (which is the same thing as the input measurement set).
"""
with tables.table(ms, readonly=False) as t:
# Copied from https://github.com/casacore/python-casacore/blob/master/casacore/tables/msutil.py#L48
column_names = t.colnames()
if CORRECTED_DATA not in column_names:
dminfo = t.getdminfo(DATA)
cdesc = t.getcoldesc(DATA)
dminfo['NAME'] = 'correcteddata'
cdesc['comment'] = 'The corrected data column'
t.addcols(tables.maketabdesc(tables.makecoldesc(CORRECTED_DATA, cdesc)), dminfo)
t.putcol(CORRECTED_DATA, t.getcol(DATA))
else:
log.info(f'{ms} already has {CORRECTED_DATA} column. Not peeling.')
return ms
log.info(f'Generating sources.json for {ms}')
with tempfile.TemporaryDirectory() as tmpdir:
sources_json = path.join(tmpdir, 'sources.json')
if _write_peeling_sources_json(utc_time, sources_json, include_rfi_source=include_rfi_source):
# This reads from the just-created CORRECTED_DATA column and writes to CORRECTED_DATA column.
ttcal.peel_with_ttcal(ms, sources_json)
return ms
def _write_peeling_sources_json(utc_timestamp: datetime, out_json: str, include_rfi_source: bool) -> Optional[str]:
sources = _get_peeling_sources_list(utc_timestamp, include_rfi_source)
if sources:
log.info(f'{len(sources)} sources to peel for {utc_timestamp.isoformat()}.')
with open(out_json, 'w') as out_file:
json.dump(sources, out_file)
return out_json
else:
log.info(f'No sources to peel for {utc_timestamp.isoformat()}')
return None
def _get_peeling_sources_list(utc_timestamp: datetime, include_rfi_source: bool) -> List[dict]:
sources = []
if coordutils.is_visible(coordutils.CYG_A, utc_timestamp):
sources.append(
CYG_A_UNPOLARIZED_RESOLVED)
if coordutils.is_visible(coordutils.CAS_A, utc_timestamp):
sources.append(
CAS_A_UNPOLARIZED_RESOLVED
)
if include_rfi_source:
sources.append(
RFI_B
)
return sources
[docs]def zest_with_casa(ms: str, reverse: bool = False):
"""Polarized peeling with CASA.
Currently, peeling list will only include at most CasA & CygA.
The peeled visibilities will be placed in CORRECTED_DATA.
Args:
ms: The measurement set.
reverse: Reverse the peeling process. Default is False.
Returns:
Path to measurement set that was modified.
"""
cllist = gen_model_ms_stokes(ms, zest=True)
if cllist is None: # No sources to zest, so exit.
return ms
for srcind, clfile in enumerate(cllist):
dcalfile = path.splitext(path.abspath(ms))[0]+'_'+str(srcind)+'.dcal'
bcalfile = path.splitext(path.abspath(ms))[0]+'_'+str(srcind)+'.bcal'
bcalfileflags = path.splitext(path.abspath(bcalfile))[0]+'_flags.bcal'
modelmsfile = path.splitext(path.abspath(ms))[0]+'_'+str(srcind)+'_models.ms'
# move calibrated data from CORRECTED_DATA into DATA column
t = tables.table(ms, readonly=False)
corrected_data = t.getcol('CORRECTED_DATA')
t.putcol('DATA',corrected_data)
t.close()
# FT component list
ft(ms, complist=clfile, usescratch=True)
# bandpass solve then polcal solve with poltype='Df'
bandpass(ms, bcalfile, refant='34', uvrange='>15lambda', solint='inf,12ch')
polcal(ms, dcalfile, refant='', uvrange='>15lambda', poltype='Df', gaintable=[bcalfile], solint='inf,12ch')
# reorder both leakage and gain term tables
_cal_reverse(bcalfile,dcalfile)
# put MODEL_DATA into DATA column of new measurement set,
# to run applycal like normal with gains
tables.tablecopy(ms,modelmsfile)
_model_to_data(modelmsfile)
applycal(modelmsfile, gaintable=[bcalfile], flagbackup=False)
applycal(ms, gaintable=[bcalfileflags], flagbackup=False)
# Reorder model measurement set for applying leakage gains
msfilemodels_XX, msfilemodels_XY, msfilemodels_YX, msfilemodels_YY = _model_data_rearrange(modelmsfile)
applycal(msfilemodels_XY, gaintable=[dcalfile], flagbackup=False)
applycal(msfilemodels_YX, gaintable=[dcalfile], flagbackup=False)
# put back into MODEL_DATA of ms, uvsub from DATA, and put result in CORRECTED_DATA
_model_data_uvsub(ms, modelmsfile)
shutil.rmtree(msfilemodels_XX)
shutil.rmtree(msfilemodels_XY)
shutil.rmtree(msfilemodels_YX)
shutil.rmtree(msfilemodels_YY)
shutil.rmtree(bcalfileflags)
shutil.rmtree(modelmsfile)
return ms
def _model_data_uvsub(msfile,msfilemodels,gain=1,add=False):
msfileallmodels = path.splitext(msfile)[0]+'_allmodels.ms'
msfilemodels_XX = path.splitext(msfilemodels)[0]+'XX.ms'
msfilemodels_XY = path.splitext(msfilemodels)[0]+'XY.ms'
msfilemodels_YX = path.splitext(msfilemodels)[0]+'YX.ms'
msfilemodels_YY = path.splitext(msfilemodels)[0]+'YY.ms'
# put back into MODEL_DATA of ms
xx = tables.table(msfilemodels_XX)
xy = tables.table(msfilemodels_XY)
yx = tables.table(msfilemodels_YX)
yy = tables.table(msfilemodels_YY)
#xxmodel = xx.getcol('CORRECTED_DATA')
xxmodel = xx.getcol('DATA')
xymodel = xy.getcol('CORRECTED_DATA')
yxmodel = yx.getcol('CORRECTED_DATA')
#yymodel = yy.getcol('CORRECTED_DATA')
yymodel = yy.getcol('DATA')
t = tables.table(msfile,readonly=False)
data = t.getcol('DATA')
model = t.getcol('MODEL_DATA')
model[:,:,0] = xxmodel[:,:,0]
model[:,:,1] = xymodel[:,:,1]
model[:,:,2] = yxmodel[:,:,2]
model[:,:,3] = yymodel[:,:,3]
if add:
t.putcol('CORRECTED_DATA',data+model*gain)
else:
t.putcol('CORRECTED_DATA',data-model*gain)
#t.putcol('MODEL_DATA',model*gain)
#t.close()
xx.close()
xy.close()
yx.close()
yy.close()
if not path.exists(msfileallmodels):
tables.tablecopy(msfilemodels,msfileallmodels)
tallmodels = tables.table(msfileallmodels,readonly=False)
allmodels = model*gain
else:
tallmodels = tables.table(msfileallmodels,readonly=False)
oldmodels = tallmodels.getcol('MODEL_DATA')
allmodels = oldmodels + model*gain
tallmodels.putcol('MODEL_DATA',allmodels)
t.putcol('MODEL_DATA',allmodels)
t.close()
tallmodels.close()
def _model_data_rearrange(modelmsfile):
tmodels = tables.table(modelmsfile,readonly=False)
data = tmodels.getcol('DATA')
corr = tmodels.getcol('CORRECTED_DATA')
newdata = np.copy(data)
tmodels.close()
msfilemodels_XX = path.splitext(modelmsfile)[0]+'XX.ms'
msfilemodels_XY = path.splitext(modelmsfile)[0]+'XY.ms'
msfilemodels_YX = path.splitext(modelmsfile)[0]+'YX.ms'
msfilemodels_YY = path.splitext(modelmsfile)[0]+'YY.ms'
tables.tablecopy(modelmsfile,msfilemodels_XX)
tables.tablecopy(modelmsfile,msfilemodels_XY)
tables.tablecopy(modelmsfile,msfilemodels_YX)
tables.tablecopy(modelmsfile,msfilemodels_YY)
xx = tables.table(msfilemodels_XX,readonly=False)
xy = tables.table(msfilemodels_XY,readonly=False)
yx = tables.table(msfilemodels_YX,readonly=False)
yy = tables.table(msfilemodels_YY,readonly=False)
# XXmodel
newdata[:,:,0] = corr[:,:,0]
newdata[:,:,1] = corr[:,:,0]*1/data[:,:,0]*data[:,:,1]
newdata[:,:,2] = corr[:,:,0]*1/data[:,:,0]*data[:,:,2]
newdata[:,:,3] = corr[:,:,0]*1/data[:,:,0]*data[:,:,3]
xx.putcol('DATA',newdata)
xx.close()
# XYmodel
newdata[:,:,0] = corr[:,:,1]*1/data[:,:,1]*data[:,:,0]
newdata[:,:,1] = corr[:,:,1]
newdata[:,:,2] = corr[:,:,1]*1/data[:,:,1]*data[:,:,2]
newdata[:,:,3] = corr[:,:,1]*1/data[:,:,1]*data[:,:,3]
xy.putcol('DATA',newdata)
xy.close()
# YXmodel
newdata[:,:,0] = corr[:,:,2]*1/data[:,:,2]*data[:,:,0]
newdata[:,:,1] = corr[:,:,2]*1/data[:,:,2]*data[:,:,1]
newdata[:,:,2] = corr[:,:,2]
newdata[:,:,3] = corr[:,:,2]*1/data[:,:,2]*data[:,:,3]
yx.putcol('DATA',newdata)
yx.close()
# YYmodel
newdata[:,:,0] = corr[:,:,3]*1/data[:,:,3]*data[:,:,0]
newdata[:,:,1] = corr[:,:,3]*1/data[:,:,3]*data[:,:,1]
newdata[:,:,2] = corr[:,:,3]*1/data[:,:,3]*data[:,:,2]
newdata[:,:,3] = corr[:,:,3]
yy.putcol('DATA',newdata)
yy.close()
return msfilemodels_XX, msfilemodels_XY, msfilemodels_YX, msfilemodels_YY
def _model_to_data(msfile):
t = tables.table(msfile, readonly=False)
model_data = t.getcol('MODEL_DATA')
t.putcol('DATA',model_data)
t.close()
return msfile
def _cal_reverse(bcalfile: str, dcalfile: str):
bcalfileflags = path.splitext(bcalfile)[0]+'_flags.bcal'
tables.tablecopy(bcalfile,bcalfileflags)
b = tables.table(bcalfile, readonly=False)
d = tables.table(dcalfile, readonly=False)
bgains = b.getcol('CPARAM') # bgains.shape = (256,109,2)
bflags = b.getcol('FLAG')
dgains = d.getcol('CPARAM')
dflags = d.getcol('FLAG')
b.putcol('CPARAM',1/bgains)
d.putcol('CPARAM',-dgains)
b.close()
d.close()
bf = tables.table(bcalfileflags, readonly=False)
bgains[:,:,:] = 1
#if np.shape(bflags)[1] != 109:
# bflags = np.repeat(bflags,int(np.ceil(109/np.shape(bflags)[1])),axis=1)[:,0:109,:]
# bgains = np.repeat(bgains,int(np.ceil(109/np.shape(bflags)[1])),axis=1)[:,0:109,:]
bfflags = bflags | dflags
bf.putcol('CPARAM',bgains)
bf.putcol('FLAG',bflags)
bf.close()
return bcalfile, dcalfile