Module diskchef.uv.uvfits_to_visibilities_ascii

Module with functions to convert GILDAS UVTable to GALARIO visibilities format

Expand source code
"""Module with functions to convert GILDAS UVTable to GALARIO visibilities format"""
import math
import pickle

import os
import textwrap
from pathlib import Path
from typing import Union, Literal, Sequence, List
import logging
import subprocess

import numpy as np
from astropy import constants as c
from astropy import units as u
from astropy.table import Table, QTable
import astropy.io.fits
import astropy.wcs
from matplotlib import pyplot as plt
import matplotlib.axes
import matplotlib.figure
import spectral_cube

try:
    import galario

    if galario.HAVE_CUDA:
        from galario import double_cuda as g_double
        from galario import single_cuda as g_single
    else:
        from galario import double as g_double
        from galario import single as g_single

except ModuleNotFoundError:
    print("Install galario:")
    print("$ conda install -c conda-forge galario")
    print("Works only on linux and osx 64 bit (not Windows)")

from diskchef.engine.exceptions import CHEFTypeError, CHEFValueError, CHEFNotImplementedError
from diskchef.engine.other import PathLike

import warnings
from spectral_cube.utils import SpectralCubeWarning

warnings.filterwarnings(action='ignore', category=SpectralCubeWarning,
                        append=True)


class UVFits:
    """
    Reads GILDAS-outputted visibilities UVFits file

    Args:
        path: PathLike -- path to the uv fits table
        channel: which channel to take from the file
            int -- single channel
            list -- given channels
            slice -- (a:b:c == slice(a,b,c)) -- slice of channels
            'all' -- all channels from the file
        sum: bool -- calculate weigthed mean of all channels (for continuum)

    Usage:

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     'all', sum=True
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                 Re [1]               Im [1]            Weight [1]
             m                   m                   Jy                   Jy               1 / Jy2
    ------------------- ------------------- ------------------- --------------------- ------------------
    -0.8546872724500936 -111.88782560913619 0.08481375196790505   0.02011162435339697  4.392747296119472
      35.68536730327412   61.11044271472704 0.14395800105016973 0.0011129999808199087  4.906656253315232
      36.53999262455663  172.99830358929054 0.04616766278918842 -0.009757890697913866  4.810738961168015
     -82.12765585917153 -41.241167812120636 0.14572769842806682  0.012205666181597085 2.6732435871384994
     -81.27303053788901   70.64653734266903  0.1260857611090533 -0.018951768666146913  2.634196780195305

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     [1,2,3], sum=False
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                             Re [3]                                        Im [3]                                     Weight [3]
             m                   m                               Jy                                            Jy                                          1 / Jy2
    ------------------- ------------------- ------------------------------------------- ----------------------------------------------- ------------------------------------------
    -0.8546872724500936 -111.88782560913619  0.08013948631211772 .. 0.07302437694644187    0.022133425091247466 .. 0.013778203363983665 0.19701689428642943 .. 0.20917844580050068
      35.68536730327412   61.11044271472704   0.14916945376238983 .. 0.1606281736906061   0.004351084873651592 .. -0.008113799327774212 0.22006599150348807 .. 0.23365029837275292
      36.53999262455663  172.99830358929054 0.02026272564921367 .. 0.051114215373805026 -0.007280084597144622 .. -0.0062913988333082845 0.21576404157411666 .. 0.22908280648299315
     -82.12765585917153 -41.241167812120636  0.13376226427333338 .. 0.18171489805253205     0.014664527234772186 .. 0.03560468549769666 0.11989631175397047 .. 0.12729731479045647
     -81.27303053788901   70.64653734266903   0.1422663387734871 .. 0.15214664685856188  -0.008367051589066395 .. -0.017851764309400768 0.11814501360102375 .. 0.12543794548908482
    """

    def __init__(
            self,
            path: PathLike,
            channel: Union[int, Sequence, slice, Literal['all']] = 'all',
            sum: bool = False
    ):
        self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__)
        self.logger.info("Creating an instance of %s", self.__class__.__qualname__)
        self.logger.debug("With parameters: %s", self.__dict__)

        self.path = os.path.abspath(path)
        self.restfreq = None
        if self.path.lower().endswith((".uvfits", ".fits")):
            self._fits = Table.read(path, hdu=0, memmap=False)
            self.table = QTable()
            self.u = self._fits['UU'].data * (u.s * c.c)
            self.v = self._fits['VV'].data * (u.s * c.c)
            if channel in ('all', Ellipsis):
                self.fetch_channel = Ellipsis
            elif isinstance(channel, int):
                self.fetch_channel = slice(channel, channel + 1)
            elif isinstance(channel, (slice, Sequence)):
                self.fetch_channel = channel
            else:
                raise CHEFTypeError("channel should be either: int, Sequence, slice, 'all', Ellipsis'")
            data = self._fits['DATA'][:, 0, 0, 0, self.fetch_channel, 0, :]
            self.fetched_channels = np.arange(self._fits['DATA'].shape[4])[self.fetch_channel]
            self.re = data[:, :, 0] << u.Jy
            self.im = data[:, :, 1] << u.Jy
            self.weight = data[:, :, 2] << (u.Jy ** -2)
            if sum:
                total_weight = np.sum(self.weight, axis=1, keepdims=True)
                self.re = np.sum(self.weight * self.re, axis=1, keepdims=True) / total_weight
                self.im = np.sum(self.weight * self.im, axis=1, keepdims=True) / total_weight
                self.weight = total_weight
                self.fetched_channels = np.mean(self.fetched_channels, keepdims=True)
            self._update_table()
            self.frequencies = (
                    (self._fits.meta['CRVAL4'] +
                     (self.fetched_channels - self._fits.meta['CRPIX4'] + 1) * self._fits.meta['CDELT4']
                     ) * u.Hz)
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        elif self.path.lower().endswith(".pkl"):
            self._fits = None
            with open(self.path, "rb") as pkl:
                self.u, self.v = pickle.load(pkl)
                self.re = np.empty_like(self.u)
                self.im = np.empty_like(self.u)
                self.weight = np.empty_like(self.u)
                self.table = astropy.table.QTable()
                self._update_table()
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        else:
            raise CHEFValueError(
                "Unknown file format (%s): "
                "expecting '.fits' / '.uvfits' for casa-style UVFITS file "
                "or '.pkl' for previously pickled UVFits instance" % path
            )

    kl = u.def_unit('kλ', 1000 * u.dimensionless_unscaled)

    @property
    def data(self):
        return self._fits['DATA'][:, 0, 0, 0, :, 0, :]

    @data.setter
    def data(self, value):
        if self._fits["DATA"].shape != value.shape:
            raise CHEFValueError("Shape mismatch! Use UVFits.set_data instead!")
        self._update_fits(value)

    def _update_table(self):
        self.table['u'] = self.u
        self.table['v'] = self.v
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight

    def _update_fits(self, data, frequencies=None):
        self._fits['DATA'] = data
        self.re = self.data[:, :, 0] << u.Jy
        self.im = self.data[:, :, 1] << u.Jy
        self.weight = self.data[:, :, 2] << (u.Jy ** -2)
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight
        self._update_table()

    @u.quantity_input
    def set_data(self, data: np.ndarray, frequencies: u.Hz = None):
        """Set new visibilities data (and, optionally, frequencies) to UVFits instance"""
        if self._fits is not None:
            if self._fits["DATA"].shape != data.shape:
                if frequencies.shape != self.data.shape[1] and self.frequencies is None:
                    raise CHEFValueError("Shape mismatch!")
                self.frequencies = frequencies
            self._update_fits(data, frequencies)
        else:
            self.re = data[:, 0, 0, 0, :, 0, :][:, :, 0] << u.Jy
            self.im = data[:, 0, 0, 0, :, 0, :][:, :, 1] << u.Jy
            self.weight = data[:, 0, 0, 0, :, 0, :][:, :, 2] << (u.Jy ** -2)
            self.table['Re'] = self.re
            self.table['Im'] = self.im
            self.table['Weight'] = self.weight

    @property
    def rest_freq(self) -> u.Hz:
        return self._fits.meta.get("RESTFREQ", None) * u.Hz

    @property
    def wavelengths(self):
        return c.c / self.frequencies

    def plot_uvgrid(
            self,
            axes: matplotlib.axes.Axes = None,
            kl: bool = False, restfreq: u.Hz = None,
            symsize: float = 1,
            **kwargs
    ) -> matplotlib.axes.Axes:
        """

        Args:
            axes: axes to draw the map on
            kl: if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
            and restfreq is not set, the mean is taken
            restfreq: u.spectral - equivalent unit, the rest frequency to use when kl is True
            kwargs: to be passed to axes.scatter

        Returns:
            axes with the uv grid plotted
        """
        if axes is None:
            _, axes = plt.subplots(1)
            axes.set_aspect('equal')

        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        if restfreq is None:
            restfreq = np.median(self.frequencies)
        restwl = restfreq.to(u.m, equivalencies=u.spectral())
        if kl:
            xplot = (xplot / restwl).to(self.kl)
            yplot = (yplot / restwl).to(self.kl)

        sizes = symsize * self.weight[:, 0] / (self.weight.max())
        sizes = np.array([*sizes, *sizes])
        axes.invert_xaxis()
        axes.scatter(xplot, yplot, alpha=0.5, s=sizes, **kwargs)
        return axes

    def plot_uv_channel_map(
            self,
            nx: int = None, ny: int = None,
            channels: Union[range, List[int], Literal[Ellipsis]] = Ellipsis,
            subplot_kw: dict = None,
            gridspec_kw: dict = (("hspace", 0), ("wspace", 0)),
            fig_kw: dict = None,
            symsize: float = 0.5,
            **kwargs
    ) -> matplotlib.figure.Figure:
        gridspec_kw = dict(gridspec_kw)
        data_to_plot = self.data[:, channels, :]
        re = data_to_plot[:, :, 0]
        im = data_to_plot[:, :, 1]

        frequencies = self.frequencies[channels]
        nchannels = re.shape[1]

        if nx is None and ny is not None:
            nx = math.ceil(nchannels / ny)
        elif nx is not None and ny is None:
            ny = math.ceil(nchannels / nx)
        elif nx is not None and ny is not None:
            if nx * ny > nchannels:
                raise CHEFValueError(f"Inconsistent number of channels: nx * ny = {nx * ny} > nchannels = {nchannels}")
        else:
            ny = nx = math.ceil(math.sqrt(nchannels))

        if fig_kw is None: fig_kw = {}
        fig, axes = plt.subplots(
            ny, nx, squeeze=False, subplot_kw=subplot_kw, gridspec_kw=gridspec_kw,
            sharex="all", sharey="all",
            **fig_kw
        )
        axes.flatten()[0].invert_xaxis()
        for ax in axes.flatten(): ax.set_visible(False)
        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        visibility = re + 1j * im
        visibility = [*visibility, *visibility] * u.Jy
        color = np.angle(visibility)
        sizes = np.abs(visibility)
        sizes /= sizes.max() * symsize

        for vis, freq, s, col, ax in zip(visibility.T, frequencies, sizes.T, color.T, axes.flatten()):
            ax.set_visible(True)
            restwl = freq.to(u.m, equivalencies=u.spectral())
            xxplot = (xplot / restwl).to(self.kl)
            yyplot = (yplot / restwl).to(self.kl)
            ax.scatter(xxplot, yyplot, s=s, c=col, cmap="twilight", **kwargs)
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)

        axes[ny - 1, 0].get_xaxis().set_visible(True)
        axes[ny - 1, 0].get_yaxis().set_visible(True)

        return fig

    def plot_total_power(
            self,
            rest_freq: u.Hz = None,
            axes: matplotlib.axes.Axes = None,
            axes_unit: Union[u.Unit, Literal["channel"]] = None,
    ) -> matplotlib.axes.Axes:
        """Plot the sum of visibilities collected by the interferometer.

        This is not actually the total power, but gives the idea for the bright lines

        Args:
            rest_freq: u.Hz - the rest frequency of the line to convert axes to km/s
            axes: matplotlib.axes.Axes - the axes to plot on
            axes_unit: u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
        """
        if rest_freq is not None:
            frequencies = self.frequencies.to(u.km / u.s, equivalencies=u.doppler_radio(rest_freq))
        else:
            frequencies = self.frequencies
        if axes_unit is not None:
            if axes_unit == "channel":
                frequencies = np.arange(len(frequencies))
            else:
                frequencies = frequencies.to(axes_unit)
        if axes is None:
            _, axes = plt.subplots(1)
        axes.plot(
            frequencies,
            np.sum(np.abs((self.visibility * self.weight) / self.weight.sum()), axis=0)
        )
        return axes

    def image_to_visibilities(self, cube: Union[spectral_cube.SpectralCube, PathLike]):
        """Import cube from a FITS `file`, sample it with visibilities of this UVFITS"""
        if isinstance(cube, spectral_cube.SpectralCube):
            pass
        else:
            cube = spectral_cube.SpectralCube.read(cube)
        pixel_area_units = u.Unit(cube.wcs.celestial.world_axis_units[0]) * u.Unit(
            cube.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(cube.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        cube = (cube.to(u.Jy / u.sr) * pixel_area).to(u.Jy)

        visibilities = []
        for i, frequency in enumerate(
                cube.with_spectral_unit(u.Hz, velocity_convention="radio").spectral_axis
        ):
            wl = (c.c / frequency).si
            u_wavelengths = (self.u / wl).to_value(u.dimensionless_unscaled)
            v_wavelengths = (self.v / wl).to_value(u.dimensionless_unscaled)
            vis = g_double.sampleImage(cube[i], dxy, u_wavelengths, v_wavelengths, origin='lower')
            visibilities.append(vis)
        visibilities = np.array(visibilities)
        visibilities_real_imag_weight = np.array(
            [visibilities.real, visibilities.imag, np.ones_like(visibilities.imag)]
        )
        uvdata_arr = visibilities_real_imag_weight.T.reshape(
            visibilities.shape[1], 1, 1, 1, visibilities.shape[0], 1, 3
        )
        self.set_data(uvdata_arr, cube.with_spectral_unit(u.Hz).spectral_axis)
        self.frequencies = cube.spectral_axis

    @property
    @u.quantity_input
    def visibility(self) -> u.Jy:
        """Returns complex array of visibities"""
        return self.re + self.im * 1j

    def pickle(self, filename: PathLike = None):
        """Pickle the UV coordinates so they can be reloaded later

        Args:
            filename: Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
        """
        if filename is None:
            filename = str(self.path) + ".pkl"
        else:
            if not filename.lower().endswith(".pkl"):
                self.logger.warning("If the pickle filename does not end with .pkl, "
                                    "it won't be automatically recognized")
        with open(filename, "wb") as uvpkl:
            pickle.dump((self.u, self.v), uvpkl)

    @property
    def degrees_of_freedom(self):
        return np.prod(self.re.shape)

    def chi2_with(self, data=Union[PathLike, spectral_cube.SpectralCube], threads: int = None, **kwargs) -> float:
        """Method to calculate chi-squared of a given UV set with a data cube

        Args:
            data: PathLike -- path to data cube readable by SpectralCube.read OR the spectral cube itself
            threads: int -- number of threads for galario. Sets globally until called again with non-default value.
        """
        if threads is not None:
            g_double.threads(threads)

        if not isinstance(data, spectral_cube.SpectralCube):
            data = spectral_cube.SpectralCube.read(data)
        if data.spectral_axis.unit != self.frequencies.unit:
            data = data.with_spectral_unit(self.frequencies.unit, velocity_convention="radio")
        self.logger.debug("Data spectral axis: %s", data.spectral_axis)
        self.logger.debug("UVTable spectral axis: %s", self.frequencies)
        if (data.spectral_axis.shape != self.frequencies.shape) or (
                not np.all(np.equal(data.spectral_axis, self.frequencies))):
            self.logger.info("Interpolate data to UVTable spectral grid")
            data = data.spectral_interpolate(spectral_grid=self.frequencies)

        pixel_area_units = u.Unit(data.wcs.celestial.world_axis_units[0]) \
                           * u.Unit(data.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(data.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        self.logger.debug("Data pixel area %s and size %s radian", pixel_area, dxy)
        data = (data.to(u.Jy / u.sr) * pixel_area).to(u.Jy)
        self.chi_per_channel = []
        for cube_slice, _wavelength, _re, _im, _weight in zip(
                data, self.wavelengths, self.re.T, self.im.T, self.weight.T
        ):
            chi_per_channel = g_double.chi2Image(
                image=cube_slice,
                dxy=dxy,
                u=(self.u / _wavelength).to_value(u.dimensionless_unscaled),
                v=(self.v / _wavelength).to_value(u.dimensionless_unscaled),
                vis_obs_re=_re.astype('float64').to_value(u.Jy),
                vis_obs_im=_im.astype('float64').to_value(u.Jy),
                vis_obs_w=_weight.astype('float64').to_value(u.Jy ** -2),
                origin='lower',
                **kwargs
            )
            self.chi_per_channel.append(chi_per_channel)
        return float(np.sum(self.chi_per_channel))

    @classmethod
    def write_visibilities_to_uvfits(
            cls,
            data_to_write: Union[PathLike, spectral_cube.SpectralCube],
            file_to_modify: PathLike,
            output_filename: PathLike = None,
            uv_kwargs: dict = None,
            residual: bool = False,
    ) -> PathLike:
        """
        Replace visibilities of `file_to_modify` to visibilities sampled from `data_to_write`,
        save into `output_filename`.

        Args:
            data_to_write: PathLike or SpectralCube, data to replace the original table
            file_to_modify: PathLike, uvfits file with visibilities and frequencies to use as a reference
            output_filename: PathLike, optional: name of new uvfits file, `file_to_modify`.modified.uvfits by default
            uv_kwargs: kwargs to be passed to UVFits initialization, ie `sum` or `channel`
            residual: if True, write input residuals `data_to_be_modified - data_to_write` instead of `data_to_write`
        Returns:
            output file name

        Usage:
            >>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
            ...     data_to_write="13CO J=2-1_image.fits",
            ...     file_to_modify="13co.uvfits",
            ...     output_filename="13co_model.uvfits",
            ...     uv_kwargs={'sum': False}
            ... )
        """
        if uv_kwargs is None:
            uv_kwargs = {}
        if output_filename is None:
            output_filename = Path(f"{file_to_modify}.modified.uvfits")

        if not isinstance(data_to_write, spectral_cube.SpectralCube):
            data_to_write = spectral_cube.SpectralCube.read(data_to_write)

        uvfits = cls(file_to_modify, **uv_kwargs)
        uvfits.image_to_visibilities(
            data_to_write.spectral_interpolate(uvfits.frequencies)
        )
        data_to_write_array = np.array(uvfits.data)

        hdul_to_modify = astropy.io.fits.open(file_to_modify, lazy_load_hdus=False, memmap=False)
        data_to_be_modified = hdul_to_modify[0].data["DATA"][:, 0, 0, 0, :, 0, :]
        if residual:
            data_to_be_modified[:, :, 0:2] -= data_to_write_array[:, :, 0:2]
        else:
            data_to_be_modified[:, :, 0:2] = data_to_write_array[:, :, 0:2]

        hdul_to_modify.writeto(output_filename, overwrite=True)

        return output_filename

    @classmethod
    def run_gildas_script(
            cls,
            script: str = "SAY HELLO WORLD",
            gildas_executable: PathLike = "imager",
            script_filename: PathLike = "last.imager",
            folder=None,
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS.

        Args:
            script: GILDAS script to run
            gildas_executable: path to GILDAS executable (imager, astro, etc.)
            script_filename: filename for script as it will be saved as a file
            folder: working directory for script

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes
        """
        if folder is None:
            folder = Path.cwd()

        script_filename = Path(script_filename)
        with open(script_filename, "w") as fff:
            fff.write(textwrap.dedent(script))

        command = f'cat "{script_filename.resolve()}" | {gildas_executable} -nw'
        logging.debug("%s$ %s", folder, command)
        proc = subprocess.run(
            command,
            capture_output=True, encoding='utf8', shell=True,
            cwd=folder
        )
        return proc

    @classmethod
    def run_imaging(
            cls,
            input_file: PathLike,
            name: str,
            imager_executable: PathLike = "imager",
            script_template: str = """
                        FITS "{input_file}" TO "{name}"
                        READ UV "{name}"
                        UV_MAP
                        CLEAN
                        LUT {lut}
                        ! VIEW CLEAN /NOPAUSE
                        LET SIZE 10
                        LET DO_CONTOUR NO
                        SHOW CLEAN
                        HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE
                """,
            script_filename: PathLike = "last.imager",
            device: Union[Literal["pdf", "png", "eps", "ps"], str] = "pdf",
            lut: str = "inferno",
            **kwargs
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS. The primary use is to send an imaging script to IMAGER

        Args:
            input_file: path to .uvfits file to be imaged, passed to `script_template.format
            name: basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
            imager_executable: path to imager executable, if not in system PATH
            script_template: string containing script with parameters listed in {} for .format
            script_filename: the filename for the created script
            device: DEVICE keyword for `[GTVL\]HARDCOPY`, the output figure file format,
                passed to `script_template.format
            lut: argument for `[GTVL\]LUT` for colormap setting, `inferno` by default, passed to `script_template.format
            **kwargs: other arguments passed to `script_template.format`

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes

        Usage:
            >>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
        """

        script_filename = Path(script_filename)
        input_file = Path(input_file)

        proc = cls.run_gildas_script(
            script=script_template.format(
                input_file=input_file.name,
                name=name,
                lut=lut,
                device=device,
                **kwargs
            ),
            gildas_executable=imager_executable,
            script_filename=script_filename,
            folder=input_file.parent
        )
        return proc

    @classmethod
    def run_gildas(cls, *args, **kwargs):
        """Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts."""
        warnings.warn("run_gildas is deprecated, use run_imaging instead", DeprecationWarning)
        return cls.run_imaging(*args, **kwargs)

Classes

class UVFits (path: Union[str, os.PathLike], channel: Union[int, Sequence, slice, Literal['all']] = 'all', sum: bool = False)

Reads GILDAS-outputted visibilities UVFits file

Args

path
PathLike – path to the uv fits table
channel
which channel to take from the file int – single channel list – given channels slice – (a:b:c == slice(a,b,c)) – slice of channels 'all' – all channels from the file
sum
bool – calculate weigthed mean of all channels (for continuum)

Usage:

>>> uv = UVFits(
...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
...     'all', sum=True
... )
>>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
         u                   v                 Re [1]               Im [1]            Weight [1]
         m                   m                   Jy                   Jy               1 / Jy2
------------------- ------------------- ------------------- --------------------- ------------------
-0.8546872724500936 -111.88782560913619 0.08481375196790505   0.02011162435339697  4.392747296119472
  35.68536730327412   61.11044271472704 0.14395800105016973 0.0011129999808199087  4.906656253315232
  36.53999262455663  172.99830358929054 0.04616766278918842 -0.009757890697913866  4.810738961168015
 -82.12765585917153 -41.241167812120636 0.14572769842806682  0.012205666181597085 2.6732435871384994
 -81.27303053788901   70.64653734266903  0.1260857611090533 -0.018951768666146913  2.634196780195305
>>> uv = UVFits(
...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
...     [1,2,3], sum=False
... )
>>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
         u                   v                             Re [3]                                        Im [3]                                     Weight [3]
         m                   m                               Jy                                            Jy                                          1 / Jy2
------------------- ------------------- ------------------------------------------- ----------------------------------------------- ------------------------------------------
-0.8546872724500936 -111.88782560913619  0.08013948631211772 .. 0.07302437694644187    0.022133425091247466 .. 0.013778203363983665 0.19701689428642943 .. 0.20917844580050068
  35.68536730327412   61.11044271472704   0.14916945376238983 .. 0.1606281736906061   0.004351084873651592 .. -0.008113799327774212 0.22006599150348807 .. 0.23365029837275292
  36.53999262455663  172.99830358929054 0.02026272564921367 .. 0.051114215373805026 -0.007280084597144622 .. -0.0062913988333082845 0.21576404157411666 .. 0.22908280648299315
 -82.12765585917153 -41.241167812120636  0.13376226427333338 .. 0.18171489805253205     0.014664527234772186 .. 0.03560468549769666 0.11989631175397047 .. 0.12729731479045647
 -81.27303053788901   70.64653734266903   0.1422663387734871 .. 0.15214664685856188  -0.008367051589066395 .. -0.017851764309400768 0.11814501360102375 .. 0.12543794548908482
Expand source code
class UVFits:
    """
    Reads GILDAS-outputted visibilities UVFits file

    Args:
        path: PathLike -- path to the uv fits table
        channel: which channel to take from the file
            int -- single channel
            list -- given channels
            slice -- (a:b:c == slice(a,b,c)) -- slice of channels
            'all' -- all channels from the file
        sum: bool -- calculate weigthed mean of all channels (for continuum)

    Usage:

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     'all', sum=True
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                 Re [1]               Im [1]            Weight [1]
             m                   m                   Jy                   Jy               1 / Jy2
    ------------------- ------------------- ------------------- --------------------- ------------------
    -0.8546872724500936 -111.88782560913619 0.08481375196790505   0.02011162435339697  4.392747296119472
      35.68536730327412   61.11044271472704 0.14395800105016973 0.0011129999808199087  4.906656253315232
      36.53999262455663  172.99830358929054 0.04616766278918842 -0.009757890697913866  4.810738961168015
     -82.12765585917153 -41.241167812120636 0.14572769842806682  0.012205666181597085 2.6732435871384994
     -81.27303053788901   70.64653734266903  0.1260857611090533 -0.018951768666146913  2.634196780195305

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     [1,2,3], sum=False
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                             Re [3]                                        Im [3]                                     Weight [3]
             m                   m                               Jy                                            Jy                                          1 / Jy2
    ------------------- ------------------- ------------------------------------------- ----------------------------------------------- ------------------------------------------
    -0.8546872724500936 -111.88782560913619  0.08013948631211772 .. 0.07302437694644187    0.022133425091247466 .. 0.013778203363983665 0.19701689428642943 .. 0.20917844580050068
      35.68536730327412   61.11044271472704   0.14916945376238983 .. 0.1606281736906061   0.004351084873651592 .. -0.008113799327774212 0.22006599150348807 .. 0.23365029837275292
      36.53999262455663  172.99830358929054 0.02026272564921367 .. 0.051114215373805026 -0.007280084597144622 .. -0.0062913988333082845 0.21576404157411666 .. 0.22908280648299315
     -82.12765585917153 -41.241167812120636  0.13376226427333338 .. 0.18171489805253205     0.014664527234772186 .. 0.03560468549769666 0.11989631175397047 .. 0.12729731479045647
     -81.27303053788901   70.64653734266903   0.1422663387734871 .. 0.15214664685856188  -0.008367051589066395 .. -0.017851764309400768 0.11814501360102375 .. 0.12543794548908482
    """

    def __init__(
            self,
            path: PathLike,
            channel: Union[int, Sequence, slice, Literal['all']] = 'all',
            sum: bool = False
    ):
        self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__)
        self.logger.info("Creating an instance of %s", self.__class__.__qualname__)
        self.logger.debug("With parameters: %s", self.__dict__)

        self.path = os.path.abspath(path)
        self.restfreq = None
        if self.path.lower().endswith((".uvfits", ".fits")):
            self._fits = Table.read(path, hdu=0, memmap=False)
            self.table = QTable()
            self.u = self._fits['UU'].data * (u.s * c.c)
            self.v = self._fits['VV'].data * (u.s * c.c)
            if channel in ('all', Ellipsis):
                self.fetch_channel = Ellipsis
            elif isinstance(channel, int):
                self.fetch_channel = slice(channel, channel + 1)
            elif isinstance(channel, (slice, Sequence)):
                self.fetch_channel = channel
            else:
                raise CHEFTypeError("channel should be either: int, Sequence, slice, 'all', Ellipsis'")
            data = self._fits['DATA'][:, 0, 0, 0, self.fetch_channel, 0, :]
            self.fetched_channels = np.arange(self._fits['DATA'].shape[4])[self.fetch_channel]
            self.re = data[:, :, 0] << u.Jy
            self.im = data[:, :, 1] << u.Jy
            self.weight = data[:, :, 2] << (u.Jy ** -2)
            if sum:
                total_weight = np.sum(self.weight, axis=1, keepdims=True)
                self.re = np.sum(self.weight * self.re, axis=1, keepdims=True) / total_weight
                self.im = np.sum(self.weight * self.im, axis=1, keepdims=True) / total_weight
                self.weight = total_weight
                self.fetched_channels = np.mean(self.fetched_channels, keepdims=True)
            self._update_table()
            self.frequencies = (
                    (self._fits.meta['CRVAL4'] +
                     (self.fetched_channels - self._fits.meta['CRPIX4'] + 1) * self._fits.meta['CDELT4']
                     ) * u.Hz)
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        elif self.path.lower().endswith(".pkl"):
            self._fits = None
            with open(self.path, "rb") as pkl:
                self.u, self.v = pickle.load(pkl)
                self.re = np.empty_like(self.u)
                self.im = np.empty_like(self.u)
                self.weight = np.empty_like(self.u)
                self.table = astropy.table.QTable()
                self._update_table()
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        else:
            raise CHEFValueError(
                "Unknown file format (%s): "
                "expecting '.fits' / '.uvfits' for casa-style UVFITS file "
                "or '.pkl' for previously pickled UVFits instance" % path
            )

    kl = u.def_unit('kλ', 1000 * u.dimensionless_unscaled)

    @property
    def data(self):
        return self._fits['DATA'][:, 0, 0, 0, :, 0, :]

    @data.setter
    def data(self, value):
        if self._fits["DATA"].shape != value.shape:
            raise CHEFValueError("Shape mismatch! Use UVFits.set_data instead!")
        self._update_fits(value)

    def _update_table(self):
        self.table['u'] = self.u
        self.table['v'] = self.v
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight

    def _update_fits(self, data, frequencies=None):
        self._fits['DATA'] = data
        self.re = self.data[:, :, 0] << u.Jy
        self.im = self.data[:, :, 1] << u.Jy
        self.weight = self.data[:, :, 2] << (u.Jy ** -2)
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight
        self._update_table()

    @u.quantity_input
    def set_data(self, data: np.ndarray, frequencies: u.Hz = None):
        """Set new visibilities data (and, optionally, frequencies) to UVFits instance"""
        if self._fits is not None:
            if self._fits["DATA"].shape != data.shape:
                if frequencies.shape != self.data.shape[1] and self.frequencies is None:
                    raise CHEFValueError("Shape mismatch!")
                self.frequencies = frequencies
            self._update_fits(data, frequencies)
        else:
            self.re = data[:, 0, 0, 0, :, 0, :][:, :, 0] << u.Jy
            self.im = data[:, 0, 0, 0, :, 0, :][:, :, 1] << u.Jy
            self.weight = data[:, 0, 0, 0, :, 0, :][:, :, 2] << (u.Jy ** -2)
            self.table['Re'] = self.re
            self.table['Im'] = self.im
            self.table['Weight'] = self.weight

    @property
    def rest_freq(self) -> u.Hz:
        return self._fits.meta.get("RESTFREQ", None) * u.Hz

    @property
    def wavelengths(self):
        return c.c / self.frequencies

    def plot_uvgrid(
            self,
            axes: matplotlib.axes.Axes = None,
            kl: bool = False, restfreq: u.Hz = None,
            symsize: float = 1,
            **kwargs
    ) -> matplotlib.axes.Axes:
        """

        Args:
            axes: axes to draw the map on
            kl: if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
            and restfreq is not set, the mean is taken
            restfreq: u.spectral - equivalent unit, the rest frequency to use when kl is True
            kwargs: to be passed to axes.scatter

        Returns:
            axes with the uv grid plotted
        """
        if axes is None:
            _, axes = plt.subplots(1)
            axes.set_aspect('equal')

        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        if restfreq is None:
            restfreq = np.median(self.frequencies)
        restwl = restfreq.to(u.m, equivalencies=u.spectral())
        if kl:
            xplot = (xplot / restwl).to(self.kl)
            yplot = (yplot / restwl).to(self.kl)

        sizes = symsize * self.weight[:, 0] / (self.weight.max())
        sizes = np.array([*sizes, *sizes])
        axes.invert_xaxis()
        axes.scatter(xplot, yplot, alpha=0.5, s=sizes, **kwargs)
        return axes

    def plot_uv_channel_map(
            self,
            nx: int = None, ny: int = None,
            channels: Union[range, List[int], Literal[Ellipsis]] = Ellipsis,
            subplot_kw: dict = None,
            gridspec_kw: dict = (("hspace", 0), ("wspace", 0)),
            fig_kw: dict = None,
            symsize: float = 0.5,
            **kwargs
    ) -> matplotlib.figure.Figure:
        gridspec_kw = dict(gridspec_kw)
        data_to_plot = self.data[:, channels, :]
        re = data_to_plot[:, :, 0]
        im = data_to_plot[:, :, 1]

        frequencies = self.frequencies[channels]
        nchannels = re.shape[1]

        if nx is None and ny is not None:
            nx = math.ceil(nchannels / ny)
        elif nx is not None and ny is None:
            ny = math.ceil(nchannels / nx)
        elif nx is not None and ny is not None:
            if nx * ny > nchannels:
                raise CHEFValueError(f"Inconsistent number of channels: nx * ny = {nx * ny} > nchannels = {nchannels}")
        else:
            ny = nx = math.ceil(math.sqrt(nchannels))

        if fig_kw is None: fig_kw = {}
        fig, axes = plt.subplots(
            ny, nx, squeeze=False, subplot_kw=subplot_kw, gridspec_kw=gridspec_kw,
            sharex="all", sharey="all",
            **fig_kw
        )
        axes.flatten()[0].invert_xaxis()
        for ax in axes.flatten(): ax.set_visible(False)
        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        visibility = re + 1j * im
        visibility = [*visibility, *visibility] * u.Jy
        color = np.angle(visibility)
        sizes = np.abs(visibility)
        sizes /= sizes.max() * symsize

        for vis, freq, s, col, ax in zip(visibility.T, frequencies, sizes.T, color.T, axes.flatten()):
            ax.set_visible(True)
            restwl = freq.to(u.m, equivalencies=u.spectral())
            xxplot = (xplot / restwl).to(self.kl)
            yyplot = (yplot / restwl).to(self.kl)
            ax.scatter(xxplot, yyplot, s=s, c=col, cmap="twilight", **kwargs)
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)

        axes[ny - 1, 0].get_xaxis().set_visible(True)
        axes[ny - 1, 0].get_yaxis().set_visible(True)

        return fig

    def plot_total_power(
            self,
            rest_freq: u.Hz = None,
            axes: matplotlib.axes.Axes = None,
            axes_unit: Union[u.Unit, Literal["channel"]] = None,
    ) -> matplotlib.axes.Axes:
        """Plot the sum of visibilities collected by the interferometer.

        This is not actually the total power, but gives the idea for the bright lines

        Args:
            rest_freq: u.Hz - the rest frequency of the line to convert axes to km/s
            axes: matplotlib.axes.Axes - the axes to plot on
            axes_unit: u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
        """
        if rest_freq is not None:
            frequencies = self.frequencies.to(u.km / u.s, equivalencies=u.doppler_radio(rest_freq))
        else:
            frequencies = self.frequencies
        if axes_unit is not None:
            if axes_unit == "channel":
                frequencies = np.arange(len(frequencies))
            else:
                frequencies = frequencies.to(axes_unit)
        if axes is None:
            _, axes = plt.subplots(1)
        axes.plot(
            frequencies,
            np.sum(np.abs((self.visibility * self.weight) / self.weight.sum()), axis=0)
        )
        return axes

    def image_to_visibilities(self, cube: Union[spectral_cube.SpectralCube, PathLike]):
        """Import cube from a FITS `file`, sample it with visibilities of this UVFITS"""
        if isinstance(cube, spectral_cube.SpectralCube):
            pass
        else:
            cube = spectral_cube.SpectralCube.read(cube)
        pixel_area_units = u.Unit(cube.wcs.celestial.world_axis_units[0]) * u.Unit(
            cube.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(cube.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        cube = (cube.to(u.Jy / u.sr) * pixel_area).to(u.Jy)

        visibilities = []
        for i, frequency in enumerate(
                cube.with_spectral_unit(u.Hz, velocity_convention="radio").spectral_axis
        ):
            wl = (c.c / frequency).si
            u_wavelengths = (self.u / wl).to_value(u.dimensionless_unscaled)
            v_wavelengths = (self.v / wl).to_value(u.dimensionless_unscaled)
            vis = g_double.sampleImage(cube[i], dxy, u_wavelengths, v_wavelengths, origin='lower')
            visibilities.append(vis)
        visibilities = np.array(visibilities)
        visibilities_real_imag_weight = np.array(
            [visibilities.real, visibilities.imag, np.ones_like(visibilities.imag)]
        )
        uvdata_arr = visibilities_real_imag_weight.T.reshape(
            visibilities.shape[1], 1, 1, 1, visibilities.shape[0], 1, 3
        )
        self.set_data(uvdata_arr, cube.with_spectral_unit(u.Hz).spectral_axis)
        self.frequencies = cube.spectral_axis

    @property
    @u.quantity_input
    def visibility(self) -> u.Jy:
        """Returns complex array of visibities"""
        return self.re + self.im * 1j

    def pickle(self, filename: PathLike = None):
        """Pickle the UV coordinates so they can be reloaded later

        Args:
            filename: Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
        """
        if filename is None:
            filename = str(self.path) + ".pkl"
        else:
            if not filename.lower().endswith(".pkl"):
                self.logger.warning("If the pickle filename does not end with .pkl, "
                                    "it won't be automatically recognized")
        with open(filename, "wb") as uvpkl:
            pickle.dump((self.u, self.v), uvpkl)

    @property
    def degrees_of_freedom(self):
        return np.prod(self.re.shape)

    def chi2_with(self, data=Union[PathLike, spectral_cube.SpectralCube], threads: int = None, **kwargs) -> float:
        """Method to calculate chi-squared of a given UV set with a data cube

        Args:
            data: PathLike -- path to data cube readable by SpectralCube.read OR the spectral cube itself
            threads: int -- number of threads for galario. Sets globally until called again with non-default value.
        """
        if threads is not None:
            g_double.threads(threads)

        if not isinstance(data, spectral_cube.SpectralCube):
            data = spectral_cube.SpectralCube.read(data)
        if data.spectral_axis.unit != self.frequencies.unit:
            data = data.with_spectral_unit(self.frequencies.unit, velocity_convention="radio")
        self.logger.debug("Data spectral axis: %s", data.spectral_axis)
        self.logger.debug("UVTable spectral axis: %s", self.frequencies)
        if (data.spectral_axis.shape != self.frequencies.shape) or (
                not np.all(np.equal(data.spectral_axis, self.frequencies))):
            self.logger.info("Interpolate data to UVTable spectral grid")
            data = data.spectral_interpolate(spectral_grid=self.frequencies)

        pixel_area_units = u.Unit(data.wcs.celestial.world_axis_units[0]) \
                           * u.Unit(data.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(data.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        self.logger.debug("Data pixel area %s and size %s radian", pixel_area, dxy)
        data = (data.to(u.Jy / u.sr) * pixel_area).to(u.Jy)
        self.chi_per_channel = []
        for cube_slice, _wavelength, _re, _im, _weight in zip(
                data, self.wavelengths, self.re.T, self.im.T, self.weight.T
        ):
            chi_per_channel = g_double.chi2Image(
                image=cube_slice,
                dxy=dxy,
                u=(self.u / _wavelength).to_value(u.dimensionless_unscaled),
                v=(self.v / _wavelength).to_value(u.dimensionless_unscaled),
                vis_obs_re=_re.astype('float64').to_value(u.Jy),
                vis_obs_im=_im.astype('float64').to_value(u.Jy),
                vis_obs_w=_weight.astype('float64').to_value(u.Jy ** -2),
                origin='lower',
                **kwargs
            )
            self.chi_per_channel.append(chi_per_channel)
        return float(np.sum(self.chi_per_channel))

    @classmethod
    def write_visibilities_to_uvfits(
            cls,
            data_to_write: Union[PathLike, spectral_cube.SpectralCube],
            file_to_modify: PathLike,
            output_filename: PathLike = None,
            uv_kwargs: dict = None,
            residual: bool = False,
    ) -> PathLike:
        """
        Replace visibilities of `file_to_modify` to visibilities sampled from `data_to_write`,
        save into `output_filename`.

        Args:
            data_to_write: PathLike or SpectralCube, data to replace the original table
            file_to_modify: PathLike, uvfits file with visibilities and frequencies to use as a reference
            output_filename: PathLike, optional: name of new uvfits file, `file_to_modify`.modified.uvfits by default
            uv_kwargs: kwargs to be passed to UVFits initialization, ie `sum` or `channel`
            residual: if True, write input residuals `data_to_be_modified - data_to_write` instead of `data_to_write`
        Returns:
            output file name

        Usage:
            >>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
            ...     data_to_write="13CO J=2-1_image.fits",
            ...     file_to_modify="13co.uvfits",
            ...     output_filename="13co_model.uvfits",
            ...     uv_kwargs={'sum': False}
            ... )
        """
        if uv_kwargs is None:
            uv_kwargs = {}
        if output_filename is None:
            output_filename = Path(f"{file_to_modify}.modified.uvfits")

        if not isinstance(data_to_write, spectral_cube.SpectralCube):
            data_to_write = spectral_cube.SpectralCube.read(data_to_write)

        uvfits = cls(file_to_modify, **uv_kwargs)
        uvfits.image_to_visibilities(
            data_to_write.spectral_interpolate(uvfits.frequencies)
        )
        data_to_write_array = np.array(uvfits.data)

        hdul_to_modify = astropy.io.fits.open(file_to_modify, lazy_load_hdus=False, memmap=False)
        data_to_be_modified = hdul_to_modify[0].data["DATA"][:, 0, 0, 0, :, 0, :]
        if residual:
            data_to_be_modified[:, :, 0:2] -= data_to_write_array[:, :, 0:2]
        else:
            data_to_be_modified[:, :, 0:2] = data_to_write_array[:, :, 0:2]

        hdul_to_modify.writeto(output_filename, overwrite=True)

        return output_filename

    @classmethod
    def run_gildas_script(
            cls,
            script: str = "SAY HELLO WORLD",
            gildas_executable: PathLike = "imager",
            script_filename: PathLike = "last.imager",
            folder=None,
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS.

        Args:
            script: GILDAS script to run
            gildas_executable: path to GILDAS executable (imager, astro, etc.)
            script_filename: filename for script as it will be saved as a file
            folder: working directory for script

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes
        """
        if folder is None:
            folder = Path.cwd()

        script_filename = Path(script_filename)
        with open(script_filename, "w") as fff:
            fff.write(textwrap.dedent(script))

        command = f'cat "{script_filename.resolve()}" | {gildas_executable} -nw'
        logging.debug("%s$ %s", folder, command)
        proc = subprocess.run(
            command,
            capture_output=True, encoding='utf8', shell=True,
            cwd=folder
        )
        return proc

    @classmethod
    def run_imaging(
            cls,
            input_file: PathLike,
            name: str,
            imager_executable: PathLike = "imager",
            script_template: str = """
                        FITS "{input_file}" TO "{name}"
                        READ UV "{name}"
                        UV_MAP
                        CLEAN
                        LUT {lut}
                        ! VIEW CLEAN /NOPAUSE
                        LET SIZE 10
                        LET DO_CONTOUR NO
                        SHOW CLEAN
                        HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE
                """,
            script_filename: PathLike = "last.imager",
            device: Union[Literal["pdf", "png", "eps", "ps"], str] = "pdf",
            lut: str = "inferno",
            **kwargs
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS. The primary use is to send an imaging script to IMAGER

        Args:
            input_file: path to .uvfits file to be imaged, passed to `script_template.format
            name: basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
            imager_executable: path to imager executable, if not in system PATH
            script_template: string containing script with parameters listed in {} for .format
            script_filename: the filename for the created script
            device: DEVICE keyword for `[GTVL\]HARDCOPY`, the output figure file format,
                passed to `script_template.format
            lut: argument for `[GTVL\]LUT` for colormap setting, `inferno` by default, passed to `script_template.format
            **kwargs: other arguments passed to `script_template.format`

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes

        Usage:
            >>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
        """

        script_filename = Path(script_filename)
        input_file = Path(input_file)

        proc = cls.run_gildas_script(
            script=script_template.format(
                input_file=input_file.name,
                name=name,
                lut=lut,
                device=device,
                **kwargs
            ),
            gildas_executable=imager_executable,
            script_filename=script_filename,
            folder=input_file.parent
        )
        return proc

    @classmethod
    def run_gildas(cls, *args, **kwargs):
        """Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts."""
        warnings.warn("run_gildas is deprecated, use run_imaging instead", DeprecationWarning)
        return cls.run_imaging(*args, **kwargs)

Class variables

var kl

Static methods

def run_gildas(*args, **kwargs)

Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts.

Expand source code
@classmethod
def run_gildas(cls, *args, **kwargs):
    """Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts."""
    warnings.warn("run_gildas is deprecated, use run_imaging instead", DeprecationWarning)
    return cls.run_imaging(*args, **kwargs)
def run_gildas_script(script: str = 'SAY HELLO WORLD', gildas_executable: Union[str, os.PathLike] = 'imager', script_filename: Union[str, os.PathLike] = 'last.imager', folder=None) ‑> subprocess.CompletedProcess

Send script to GILDAS.

Args

script
GILDAS script to run
gildas_executable
path to GILDAS executable (imager, astro, etc.)
script_filename
filename for script as it will be saved as a file
folder
working directory for script

Returns

subprocess.CompletedProcess instance with .stdout and .stderr attributes

Expand source code
@classmethod
def run_gildas_script(
        cls,
        script: str = "SAY HELLO WORLD",
        gildas_executable: PathLike = "imager",
        script_filename: PathLike = "last.imager",
        folder=None,
) -> subprocess.CompletedProcess:
    """
    Send script to GILDAS.

    Args:
        script: GILDAS script to run
        gildas_executable: path to GILDAS executable (imager, astro, etc.)
        script_filename: filename for script as it will be saved as a file
        folder: working directory for script

    Returns:
        subprocess.CompletedProcess instance with .stdout and .stderr attributes
    """
    if folder is None:
        folder = Path.cwd()

    script_filename = Path(script_filename)
    with open(script_filename, "w") as fff:
        fff.write(textwrap.dedent(script))

    command = f'cat "{script_filename.resolve()}" | {gildas_executable} -nw'
    logging.debug("%s$ %s", folder, command)
    proc = subprocess.run(
        command,
        capture_output=True, encoding='utf8', shell=True,
        cwd=folder
    )
    return proc
def run_imaging(input_file: Union[str, os.PathLike], name: str, imager_executable: Union[str, os.PathLike] = 'imager', script_template: str = '\n FITS "{input_file}" TO "{name}"\n READ UV "{name}"\n UV_MAP\n CLEAN\n LUT {lut}\n ! VIEW CLEAN /NOPAUSE\n LET SIZE 10\n LET DO_CONTOUR NO\n SHOW CLEAN\n HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE\n ', script_filename: Union[str, os.PathLike] = 'last.imager', device: Union[Literal['pdf', 'png', 'eps', 'ps'], str] = 'pdf', lut: str = 'inferno', **kwargs) ‑> subprocess.CompletedProcess

Send script to GILDAS. The primary use is to send an imaging script to IMAGER

Args

input_file
path to .uvfits file to be imaged, passed to `script_template.format
name
basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
imager_executable
path to imager executable, if not in system PATH
script_template
string containing script with parameters listed in {} for .format
script_filename
the filename for the created script
device
DEVICE keyword for [GTVL\]HARDCOPY, the output figure file format, passed to `script_template.format
lut
argument for [GTVL\]LUT for colormap setting, inferno by default, passed to `script_template.format
**kwargs
other arguments passed to script_template.format

Returns

subprocess.CompletedProcess instance with .stdout and .stderr attributes

Usage

>>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
Expand source code
@classmethod
def run_imaging(
        cls,
        input_file: PathLike,
        name: str,
        imager_executable: PathLike = "imager",
        script_template: str = """
                    FITS "{input_file}" TO "{name}"
                    READ UV "{name}"
                    UV_MAP
                    CLEAN
                    LUT {lut}
                    ! VIEW CLEAN /NOPAUSE
                    LET SIZE 10
                    LET DO_CONTOUR NO
                    SHOW CLEAN
                    HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE
            """,
        script_filename: PathLike = "last.imager",
        device: Union[Literal["pdf", "png", "eps", "ps"], str] = "pdf",
        lut: str = "inferno",
        **kwargs
) -> subprocess.CompletedProcess:
    """
    Send script to GILDAS. The primary use is to send an imaging script to IMAGER

    Args:
        input_file: path to .uvfits file to be imaged, passed to `script_template.format
        name: basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
        imager_executable: path to imager executable, if not in system PATH
        script_template: string containing script with parameters listed in {} for .format
        script_filename: the filename for the created script
        device: DEVICE keyword for `[GTVL\]HARDCOPY`, the output figure file format,
            passed to `script_template.format
        lut: argument for `[GTVL\]LUT` for colormap setting, `inferno` by default, passed to `script_template.format
        **kwargs: other arguments passed to `script_template.format`

    Returns:
        subprocess.CompletedProcess instance with .stdout and .stderr attributes

    Usage:
        >>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
    """

    script_filename = Path(script_filename)
    input_file = Path(input_file)

    proc = cls.run_gildas_script(
        script=script_template.format(
            input_file=input_file.name,
            name=name,
            lut=lut,
            device=device,
            **kwargs
        ),
        gildas_executable=imager_executable,
        script_filename=script_filename,
        folder=input_file.parent
    )
    return proc
def write_visibilities_to_uvfits(data_to_write: Union[str, os.PathLike, spectral_cube.spectral_cube.SpectralCube], file_to_modify: Union[str, os.PathLike], output_filename: Union[str, os.PathLike] = None, uv_kwargs: dict = None, residual: bool = False) ‑> Union[str, os.PathLike]

Replace visibilities of file_to_modify to visibilities sampled from data_to_write, save into output_filename.

Args

data_to_write
PathLike or SpectralCube, data to replace the original table
file_to_modify
PathLike, uvfits file with visibilities and frequencies to use as a reference
output_filename
PathLike, optional: name of new uvfits file, file_to_modify.modified.uvfits by default
uv_kwargs
kwargs to be passed to UVFits initialization, ie sum or channel
residual
if True, write input residuals data_to_be_modified - data_to_write instead of data_to_write

Returns

output file name

Usage

>>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
...     data_to_write="13CO J=2-1_image.fits",
...     file_to_modify="13co.uvfits",
...     output_filename="13co_model.uvfits",
...     uv_kwargs={'sum': False}
... )
Expand source code
@classmethod
def write_visibilities_to_uvfits(
        cls,
        data_to_write: Union[PathLike, spectral_cube.SpectralCube],
        file_to_modify: PathLike,
        output_filename: PathLike = None,
        uv_kwargs: dict = None,
        residual: bool = False,
) -> PathLike:
    """
    Replace visibilities of `file_to_modify` to visibilities sampled from `data_to_write`,
    save into `output_filename`.

    Args:
        data_to_write: PathLike or SpectralCube, data to replace the original table
        file_to_modify: PathLike, uvfits file with visibilities and frequencies to use as a reference
        output_filename: PathLike, optional: name of new uvfits file, `file_to_modify`.modified.uvfits by default
        uv_kwargs: kwargs to be passed to UVFits initialization, ie `sum` or `channel`
        residual: if True, write input residuals `data_to_be_modified - data_to_write` instead of `data_to_write`
    Returns:
        output file name

    Usage:
        >>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
        ...     data_to_write="13CO J=2-1_image.fits",
        ...     file_to_modify="13co.uvfits",
        ...     output_filename="13co_model.uvfits",
        ...     uv_kwargs={'sum': False}
        ... )
    """
    if uv_kwargs is None:
        uv_kwargs = {}
    if output_filename is None:
        output_filename = Path(f"{file_to_modify}.modified.uvfits")

    if not isinstance(data_to_write, spectral_cube.SpectralCube):
        data_to_write = spectral_cube.SpectralCube.read(data_to_write)

    uvfits = cls(file_to_modify, **uv_kwargs)
    uvfits.image_to_visibilities(
        data_to_write.spectral_interpolate(uvfits.frequencies)
    )
    data_to_write_array = np.array(uvfits.data)

    hdul_to_modify = astropy.io.fits.open(file_to_modify, lazy_load_hdus=False, memmap=False)
    data_to_be_modified = hdul_to_modify[0].data["DATA"][:, 0, 0, 0, :, 0, :]
    if residual:
        data_to_be_modified[:, :, 0:2] -= data_to_write_array[:, :, 0:2]
    else:
        data_to_be_modified[:, :, 0:2] = data_to_write_array[:, :, 0:2]

    hdul_to_modify.writeto(output_filename, overwrite=True)

    return output_filename

Instance variables

var data
Expand source code
@property
def data(self):
    return self._fits['DATA'][:, 0, 0, 0, :, 0, :]
var degrees_of_freedom
Expand source code
@property
def degrees_of_freedom(self):
    return np.prod(self.re.shape)
var rest_freq : Unit("Hz")
Expand source code
@property
def rest_freq(self) -> u.Hz:
    return self._fits.meta.get("RESTFREQ", None) * u.Hz
var visibility : Unit("Jy")

Returns complex array of visibities

Expand source code
@property
@u.quantity_input
def visibility(self) -> u.Jy:
    """Returns complex array of visibities"""
    return self.re + self.im * 1j
var wavelengths
Expand source code
@property
def wavelengths(self):
    return c.c / self.frequencies

Methods

def chi2_with(self, data=typing.Union[str, os.PathLike, spectral_cube.spectral_cube.SpectralCube], threads: int = None, **kwargs) ‑> float

Method to calculate chi-squared of a given UV set with a data cube

Args

data
PathLike – path to data cube readable by SpectralCube.read OR the spectral cube itself
threads
int – number of threads for galario. Sets globally until called again with non-default value.
Expand source code
def chi2_with(self, data=Union[PathLike, spectral_cube.SpectralCube], threads: int = None, **kwargs) -> float:
    """Method to calculate chi-squared of a given UV set with a data cube

    Args:
        data: PathLike -- path to data cube readable by SpectralCube.read OR the spectral cube itself
        threads: int -- number of threads for galario. Sets globally until called again with non-default value.
    """
    if threads is not None:
        g_double.threads(threads)

    if not isinstance(data, spectral_cube.SpectralCube):
        data = spectral_cube.SpectralCube.read(data)
    if data.spectral_axis.unit != self.frequencies.unit:
        data = data.with_spectral_unit(self.frequencies.unit, velocity_convention="radio")
    self.logger.debug("Data spectral axis: %s", data.spectral_axis)
    self.logger.debug("UVTable spectral axis: %s", self.frequencies)
    if (data.spectral_axis.shape != self.frequencies.shape) or (
            not np.all(np.equal(data.spectral_axis, self.frequencies))):
        self.logger.info("Interpolate data to UVTable spectral grid")
        data = data.spectral_interpolate(spectral_grid=self.frequencies)

    pixel_area_units = u.Unit(data.wcs.celestial.world_axis_units[0]) \
                       * u.Unit(data.wcs.celestial.world_axis_units[1])
    pixel_area = astropy.wcs.utils.proj_plane_pixel_area(data.wcs.celestial) * pixel_area_units
    dxy = np.sqrt(pixel_area).to_value(u.rad)
    self.logger.debug("Data pixel area %s and size %s radian", pixel_area, dxy)
    data = (data.to(u.Jy / u.sr) * pixel_area).to(u.Jy)
    self.chi_per_channel = []
    for cube_slice, _wavelength, _re, _im, _weight in zip(
            data, self.wavelengths, self.re.T, self.im.T, self.weight.T
    ):
        chi_per_channel = g_double.chi2Image(
            image=cube_slice,
            dxy=dxy,
            u=(self.u / _wavelength).to_value(u.dimensionless_unscaled),
            v=(self.v / _wavelength).to_value(u.dimensionless_unscaled),
            vis_obs_re=_re.astype('float64').to_value(u.Jy),
            vis_obs_im=_im.astype('float64').to_value(u.Jy),
            vis_obs_w=_weight.astype('float64').to_value(u.Jy ** -2),
            origin='lower',
            **kwargs
        )
        self.chi_per_channel.append(chi_per_channel)
    return float(np.sum(self.chi_per_channel))
def image_to_visibilities(self, cube: Union[str, os.PathLike, spectral_cube.spectral_cube.SpectralCube])

Import cube from a FITS file, sample it with visibilities of this UVFITS

Expand source code
def image_to_visibilities(self, cube: Union[spectral_cube.SpectralCube, PathLike]):
    """Import cube from a FITS `file`, sample it with visibilities of this UVFITS"""
    if isinstance(cube, spectral_cube.SpectralCube):
        pass
    else:
        cube = spectral_cube.SpectralCube.read(cube)
    pixel_area_units = u.Unit(cube.wcs.celestial.world_axis_units[0]) * u.Unit(
        cube.wcs.celestial.world_axis_units[1])
    pixel_area = astropy.wcs.utils.proj_plane_pixel_area(cube.wcs.celestial) * pixel_area_units
    dxy = np.sqrt(pixel_area).to_value(u.rad)
    cube = (cube.to(u.Jy / u.sr) * pixel_area).to(u.Jy)

    visibilities = []
    for i, frequency in enumerate(
            cube.with_spectral_unit(u.Hz, velocity_convention="radio").spectral_axis
    ):
        wl = (c.c / frequency).si
        u_wavelengths = (self.u / wl).to_value(u.dimensionless_unscaled)
        v_wavelengths = (self.v / wl).to_value(u.dimensionless_unscaled)
        vis = g_double.sampleImage(cube[i], dxy, u_wavelengths, v_wavelengths, origin='lower')
        visibilities.append(vis)
    visibilities = np.array(visibilities)
    visibilities_real_imag_weight = np.array(
        [visibilities.real, visibilities.imag, np.ones_like(visibilities.imag)]
    )
    uvdata_arr = visibilities_real_imag_weight.T.reshape(
        visibilities.shape[1], 1, 1, 1, visibilities.shape[0], 1, 3
    )
    self.set_data(uvdata_arr, cube.with_spectral_unit(u.Hz).spectral_axis)
    self.frequencies = cube.spectral_axis
def pickle(self, filename: Union[str, os.PathLike] = None)

Pickle the UV coordinates so they can be reloaded later

Args

filename
Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
Expand source code
def pickle(self, filename: PathLike = None):
    """Pickle the UV coordinates so they can be reloaded later

    Args:
        filename: Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
    """
    if filename is None:
        filename = str(self.path) + ".pkl"
    else:
        if not filename.lower().endswith(".pkl"):
            self.logger.warning("If the pickle filename does not end with .pkl, "
                                "it won't be automatically recognized")
    with open(filename, "wb") as uvpkl:
        pickle.dump((self.u, self.v), uvpkl)
def plot_total_power(self, rest_freq: Unit("Hz") = None, axes: matplotlib.axes._axes.Axes = None, axes_unit: Union[astropy.units.core.Unit, Literal['channel']] = None) ‑> matplotlib.axes._axes.Axes

Plot the sum of visibilities collected by the interferometer.

This is not actually the total power, but gives the idea for the bright lines

Args

rest_freq
u.Hz - the rest frequency of the line to convert axes to km/s
axes
matplotlib.axes.Axes - the axes to plot on
axes_unit
u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
Expand source code
def plot_total_power(
        self,
        rest_freq: u.Hz = None,
        axes: matplotlib.axes.Axes = None,
        axes_unit: Union[u.Unit, Literal["channel"]] = None,
) -> matplotlib.axes.Axes:
    """Plot the sum of visibilities collected by the interferometer.

    This is not actually the total power, but gives the idea for the bright lines

    Args:
        rest_freq: u.Hz - the rest frequency of the line to convert axes to km/s
        axes: matplotlib.axes.Axes - the axes to plot on
        axes_unit: u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
    """
    if rest_freq is not None:
        frequencies = self.frequencies.to(u.km / u.s, equivalencies=u.doppler_radio(rest_freq))
    else:
        frequencies = self.frequencies
    if axes_unit is not None:
        if axes_unit == "channel":
            frequencies = np.arange(len(frequencies))
        else:
            frequencies = frequencies.to(axes_unit)
    if axes is None:
        _, axes = plt.subplots(1)
    axes.plot(
        frequencies,
        np.sum(np.abs((self.visibility * self.weight) / self.weight.sum()), axis=0)
    )
    return axes
def plot_uv_channel_map(self, nx: int = None, ny: int = None, channels: Union[range, List[int], Literal[...]] = Ellipsis, subplot_kw: dict = None, gridspec_kw: dict = (('hspace', 0), ('wspace', 0)), fig_kw: dict = None, symsize: float = 0.5, **kwargs) ‑> matplotlib.figure.Figure
Expand source code
def plot_uv_channel_map(
        self,
        nx: int = None, ny: int = None,
        channels: Union[range, List[int], Literal[Ellipsis]] = Ellipsis,
        subplot_kw: dict = None,
        gridspec_kw: dict = (("hspace", 0), ("wspace", 0)),
        fig_kw: dict = None,
        symsize: float = 0.5,
        **kwargs
) -> matplotlib.figure.Figure:
    gridspec_kw = dict(gridspec_kw)
    data_to_plot = self.data[:, channels, :]
    re = data_to_plot[:, :, 0]
    im = data_to_plot[:, :, 1]

    frequencies = self.frequencies[channels]
    nchannels = re.shape[1]

    if nx is None and ny is not None:
        nx = math.ceil(nchannels / ny)
    elif nx is not None and ny is None:
        ny = math.ceil(nchannels / nx)
    elif nx is not None and ny is not None:
        if nx * ny > nchannels:
            raise CHEFValueError(f"Inconsistent number of channels: nx * ny = {nx * ny} > nchannels = {nchannels}")
    else:
        ny = nx = math.ceil(math.sqrt(nchannels))

    if fig_kw is None: fig_kw = {}
    fig, axes = plt.subplots(
        ny, nx, squeeze=False, subplot_kw=subplot_kw, gridspec_kw=gridspec_kw,
        sharex="all", sharey="all",
        **fig_kw
    )
    axes.flatten()[0].invert_xaxis()
    for ax in axes.flatten(): ax.set_visible(False)
    xplot = u.Quantity([*self.u, *(-self.u)])
    yplot = u.Quantity([*self.v, *(-self.v)])

    visibility = re + 1j * im
    visibility = [*visibility, *visibility] * u.Jy
    color = np.angle(visibility)
    sizes = np.abs(visibility)
    sizes /= sizes.max() * symsize

    for vis, freq, s, col, ax in zip(visibility.T, frequencies, sizes.T, color.T, axes.flatten()):
        ax.set_visible(True)
        restwl = freq.to(u.m, equivalencies=u.spectral())
        xxplot = (xplot / restwl).to(self.kl)
        yyplot = (yplot / restwl).to(self.kl)
        ax.scatter(xxplot, yyplot, s=s, c=col, cmap="twilight", **kwargs)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    axes[ny - 1, 0].get_xaxis().set_visible(True)
    axes[ny - 1, 0].get_yaxis().set_visible(True)

    return fig
def plot_uvgrid(self, axes: matplotlib.axes._axes.Axes = None, kl: bool = False, restfreq: Unit("Hz") = None, symsize: float = 1, **kwargs) ‑> matplotlib.axes._axes.Axes

Args

axes
axes to draw the map on
kl
if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
and restfreq is not set, the mean is taken
restfreq
u.spectral - equivalent unit, the rest frequency to use when kl is True
kwargs
to be passed to axes.scatter

Returns

axes with the uv grid plotted

Expand source code
def plot_uvgrid(
        self,
        axes: matplotlib.axes.Axes = None,
        kl: bool = False, restfreq: u.Hz = None,
        symsize: float = 1,
        **kwargs
) -> matplotlib.axes.Axes:
    """

    Args:
        axes: axes to draw the map on
        kl: if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
        and restfreq is not set, the mean is taken
        restfreq: u.spectral - equivalent unit, the rest frequency to use when kl is True
        kwargs: to be passed to axes.scatter

    Returns:
        axes with the uv grid plotted
    """
    if axes is None:
        _, axes = plt.subplots(1)
        axes.set_aspect('equal')

    xplot = u.Quantity([*self.u, *(-self.u)])
    yplot = u.Quantity([*self.v, *(-self.v)])

    if restfreq is None:
        restfreq = np.median(self.frequencies)
    restwl = restfreq.to(u.m, equivalencies=u.spectral())
    if kl:
        xplot = (xplot / restwl).to(self.kl)
        yplot = (yplot / restwl).to(self.kl)

    sizes = symsize * self.weight[:, 0] / (self.weight.max())
    sizes = np.array([*sizes, *sizes])
    axes.invert_xaxis()
    axes.scatter(xplot, yplot, alpha=0.5, s=sizes, **kwargs)
    return axes
def set_data(self, data: numpy.ndarray, frequencies: Unit("Hz") = None)

Set new visibilities data (and, optionally, frequencies) to UVFits instance

Expand source code
@u.quantity_input
def set_data(self, data: np.ndarray, frequencies: u.Hz = None):
    """Set new visibilities data (and, optionally, frequencies) to UVFits instance"""
    if self._fits is not None:
        if self._fits["DATA"].shape != data.shape:
            if frequencies.shape != self.data.shape[1] and self.frequencies is None:
                raise CHEFValueError("Shape mismatch!")
            self.frequencies = frequencies
        self._update_fits(data, frequencies)
    else:
        self.re = data[:, 0, 0, 0, :, 0, :][:, :, 0] << u.Jy
        self.im = data[:, 0, 0, 0, :, 0, :][:, :, 1] << u.Jy
        self.weight = data[:, 0, 0, 0, :, 0, :][:, :, 2] << (u.Jy ** -2)
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight