Module diskchef.uv.uvfits_to_visibilities_ascii

Module with functions to convert GILDAS UVTable to GALARIO visibilities format

Classes

class UVFits (path: str | os.PathLike,
channel: int | Sequence | slice | Literal['all'] = 'all',
sum: bool = False)
Expand source code
class UVFits:
    """
    Reads GILDAS-outputted visibilities UVFits file

    Args:
        path: PathLike -- path to the uv fits table
        channel: which channel to take from the file
            int -- single channel
            list -- given channels
            slice -- (a:b:c == slice(a,b,c)) -- slice of channels
            'all' -- all channels from the file
        sum: bool -- calculate weigthed mean of all channels (for continuum)

    Usage:

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     'all', sum=True
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                 Re [1]               Im [1]            Weight [1]
             m                   m                   Jy                   Jy               1 / Jy2
    ------------------- ------------------- ------------------- --------------------- ------------------
    -0.8546872724500936 -111.88782560913619 0.08481375196790505   0.02011162435339697  4.392747296119472
      35.68536730327412   61.11044271472704 0.14395800105016973 0.0011129999808199087  4.906656253315232
      36.53999262455663  172.99830358929054 0.04616766278918842 -0.009757890697913866  4.810738961168015
     -82.12765585917153 -41.241167812120636 0.14572769842806682  0.012205666181597085 2.6732435871384994
     -81.27303053788901   70.64653734266903  0.1260857611090533 -0.018951768666146913  2.634196780195305

    >>> uv = UVFits(
    ...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
    ...     [1,2,3], sum=False
    ... )
    >>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
             u                   v                             Re [3]                                        Im [3]                                     Weight [3]
             m                   m                               Jy                                            Jy                                          1 / Jy2
    ------------------- ------------------- ------------------------------------------- ----------------------------------------------- ------------------------------------------
    -0.8546872724500936 -111.88782560913619  0.08013948631211772 .. 0.07302437694644187    0.022133425091247466 .. 0.013778203363983665 0.19701689428642943 .. 0.20917844580050068
      35.68536730327412   61.11044271472704   0.14916945376238983 .. 0.1606281736906061   0.004351084873651592 .. -0.008113799327774212 0.22006599150348807 .. 0.23365029837275292
      36.53999262455663  172.99830358929054 0.02026272564921367 .. 0.051114215373805026 -0.007280084597144622 .. -0.0062913988333082845 0.21576404157411666 .. 0.22908280648299315
     -82.12765585917153 -41.241167812120636  0.13376226427333338 .. 0.18171489805253205     0.014664527234772186 .. 0.03560468549769666 0.11989631175397047 .. 0.12729731479045647
     -81.27303053788901   70.64653734266903   0.1422663387734871 .. 0.15214664685856188  -0.008367051589066395 .. -0.017851764309400768 0.11814501360102375 .. 0.12543794548908482
    """

    def __init__(
            self,
            path: PathLike,
            channel: Union[int, Sequence, slice, Literal['all']] = 'all',
            sum: bool = False
    ):
        self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__)
        self.logger.info("Creating an instance of %s", self.__class__.__qualname__)
        self.logger.debug("With parameters: %s", self.__dict__)

        self.path = os.path.abspath(path)
        self.restfreq = None
        if self.path.lower().endswith((".uvfits", ".fits")):
            self._fits = Table.read(path, hdu=0, memmap=False)
            self.table = QTable()
            self.u = self._fits['UU'].data * (u.s * c.c)
            self.v = self._fits['VV'].data * (u.s * c.c)
            if channel in ('all', Ellipsis):
                self.fetch_channel = Ellipsis
            elif isinstance(channel, int):
                self.fetch_channel = slice(channel, channel + 1)
            elif isinstance(channel, (slice, Sequence)):
                self.fetch_channel = channel
            else:
                raise CHEFTypeError("channel should be either: int, Sequence, slice, 'all', Ellipsis'")
            data = self._fits['DATA'][:, 0, 0, 0, self.fetch_channel, 0, :]
            self.fetched_channels = np.arange(self._fits['DATA'].shape[4])[self.fetch_channel]
            self.re = data[:, :, 0] << u.Jy
            self.im = data[:, :, 1] << u.Jy
            self.weight = data[:, :, 2] << (u.Jy ** -2)
            if sum:
                total_weight = np.sum(self.weight, axis=1, keepdims=True)
                self.re = np.sum(self.weight * self.re, axis=1, keepdims=True) / total_weight
                self.im = np.sum(self.weight * self.im, axis=1, keepdims=True) / total_weight
                self.weight = total_weight
                self.fetched_channels = np.mean(self.fetched_channels, keepdims=True)
            self._update_table()
            self.frequencies = (
                    (self._fits.meta['CRVAL4'] +
                     (self.fetched_channels - self._fits.meta['CRPIX4'] + 1) * self._fits.meta['CDELT4']
                     ) * u.Hz)
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        elif self.path.lower().endswith(".pkl"):
            self._fits = None
            with open(self.path, "rb") as pkl:
                self.u, self.v = pickle.load(pkl)
                self.re = np.empty_like(self.u)
                self.im = np.empty_like(self.u)
                self.weight = np.empty_like(self.u)
                self.table = astropy.table.QTable()
                self._update_table()
            self.restfreq = self._fits.meta.get("RESTFREQ", None)
        else:
            raise CHEFValueError(
                "Unknown file format (%s): "
                "expecting '.fits' / '.uvfits' for casa-style UVFITS file "
                "or '.pkl' for previously pickled UVFits instance" % path
            )

    kl = u.def_unit('kλ', 1000 * u.dimensionless_unscaled)

    @property
    def data(self):
        return self._fits['DATA'][:, 0, 0, 0, :, 0, :]

    @data.setter
    def data(self, value):
        if self._fits["DATA"].shape != value.shape:
            raise CHEFValueError("Shape mismatch! Use UVFits.set_data instead!")
        self._update_fits(value)

    def _update_table(self):
        self.table['u'] = self.u
        self.table['v'] = self.v
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight

    def _update_fits(self, data, frequencies=None):
        self._fits['DATA'] = data
        self.re = self.data[:, :, 0] << u.Jy
        self.im = self.data[:, :, 1] << u.Jy
        self.weight = self.data[:, :, 2] << (u.Jy ** -2)
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight
        self._update_table()

    @u.quantity_input
    def set_data(self, data: np.ndarray, frequencies: u.Hz = None):
        """Set new visibilities data (and, optionally, frequencies) to UVFits instance"""
        if self._fits is not None:
            if self._fits["DATA"].shape != data.shape:
                if frequencies.shape != self.data.shape[1] and self.frequencies is None:
                    raise CHEFValueError("Shape mismatch!")
                self.frequencies = frequencies
            self._update_fits(data, frequencies)
        else:
            self.re = data[:, 0, 0, 0, :, 0, :][:, :, 0] << u.Jy
            self.im = data[:, 0, 0, 0, :, 0, :][:, :, 1] << u.Jy
            self.weight = data[:, 0, 0, 0, :, 0, :][:, :, 2] << (u.Jy ** -2)
            self.table['Re'] = self.re
            self.table['Im'] = self.im
            self.table['Weight'] = self.weight

    @property
    def rest_freq(self) -> u.Hz:
        return self._fits.meta.get("RESTFREQ", None) * u.Hz

    @property
    def wavelengths(self):
        return c.c / self.frequencies

    def plot_uvgrid(
            self,
            axes: matplotlib.axes.Axes = None,
            kl: bool = False, restfreq: u.Hz = None,
            symsize: float = 1,
            **kwargs
    ) -> matplotlib.axes.Axes:
        """

        Args:
            axes: axes to draw the map on
            kl: if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
            and restfreq is not set, the mean is taken
            restfreq: u.spectral - equivalent unit, the rest frequency to use when kl is True
            kwargs: to be passed to axes.scatter

        Returns:
            axes with the uv grid plotted
        """
        if axes is None:
            _, axes = plt.subplots(1)
            axes.set_aspect('equal')

        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        if restfreq is None:
            restfreq = np.median(self.frequencies)
        restwl = restfreq.to(u.m, equivalencies=u.spectral())
        if kl:
            xplot = (xplot / restwl).to(self.kl)
            yplot = (yplot / restwl).to(self.kl)

        sizes = symsize * self.weight[:, 0] / (self.weight.max())
        sizes = np.array([*sizes, *sizes])
        axes.invert_xaxis()
        axes.scatter(xplot, yplot, alpha=0.5, s=sizes, **kwargs)
        return axes

    def plot_uv_channel_map(
            self,
            nx: int = None, ny: int = None,
            channels: Union[range, List[int], Literal[Ellipsis]] = Ellipsis,
            subplot_kw: dict = None,
            gridspec_kw: dict = (("hspace", 0), ("wspace", 0)),
            fig_kw: dict = None,
            symsize: float = 0.5,
            **kwargs
    ) -> matplotlib.figure.Figure:
        gridspec_kw = dict(gridspec_kw)
        data_to_plot = self.data[:, channels, :]
        re = data_to_plot[:, :, 0]
        im = data_to_plot[:, :, 1]

        frequencies = self.frequencies[channels]
        nchannels = re.shape[1]

        if nx is None and ny is not None:
            nx = math.ceil(nchannels / ny)
        elif nx is not None and ny is None:
            ny = math.ceil(nchannels / nx)
        elif nx is not None and ny is not None:
            if nx * ny > nchannels:
                raise CHEFValueError(f"Inconsistent number of channels: nx * ny = {nx * ny} > nchannels = {nchannels}")
        else:
            ny = nx = math.ceil(math.sqrt(nchannels))

        if fig_kw is None: fig_kw = {}
        fig, axes = plt.subplots(
            ny, nx, squeeze=False, subplot_kw=subplot_kw, gridspec_kw=gridspec_kw,
            sharex="all", sharey="all",
            **fig_kw
        )
        axes.flatten()[0].invert_xaxis()
        for ax in axes.flatten(): ax.set_visible(False)
        xplot = u.Quantity([*self.u, *(-self.u)])
        yplot = u.Quantity([*self.v, *(-self.v)])

        visibility = re + 1j * im
        visibility = [*visibility, *visibility] * u.Jy
        color = np.angle(visibility)
        sizes = np.abs(visibility)
        sizes /= sizes.max() * symsize

        for vis, freq, s, col, ax in zip(visibility.T, frequencies, sizes.T, color.T, axes.flatten()):
            ax.set_visible(True)
            restwl = freq.to(u.m, equivalencies=u.spectral())
            xxplot = (xplot / restwl).to(self.kl)
            yyplot = (yplot / restwl).to(self.kl)
            ax.scatter(xxplot, yyplot, s=s, c=col, cmap="twilight", **kwargs)
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)

        axes[ny - 1, 0].get_xaxis().set_visible(True)
        axes[ny - 1, 0].get_yaxis().set_visible(True)

        return fig

    def plot_total_power(
            self,
            rest_freq: u.Hz = None,
            axes: matplotlib.axes.Axes = None,
            axes_unit: Union[u.Unit, Literal["channel"]] = None,
    ) -> matplotlib.axes.Axes:
        """Plot the sum of visibilities collected by the interferometer.

        This is not actually the total power, but gives the idea for the bright lines

        Args:
            rest_freq: u.Hz - the rest frequency of the line to convert axes to km/s
            axes: matplotlib.axes.Axes - the axes to plot on
            axes_unit: u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
        """
        if rest_freq is not None:
            frequencies = self.frequencies.to(u.km / u.s, equivalencies=u.doppler_radio(rest_freq))
        else:
            frequencies = self.frequencies
        if axes_unit is not None:
            if axes_unit == "channel":
                frequencies = np.arange(len(frequencies))
            else:
                frequencies = frequencies.to(axes_unit)
        if axes is None:
            _, axes = plt.subplots(1)
        axes.plot(
            frequencies,
            np.sum(np.abs((self.visibility * self.weight) / self.weight.sum()), axis=0)
        )
        return axes

    def image_to_visibilities(self, cube: Union[spectral_cube.SpectralCube, PathLike]):
        """Import cube from a FITS `file`, sample it with visibilities of this UVFITS"""
        if isinstance(cube, spectral_cube.SpectralCube):
            pass
        else:
            cube = spectral_cube.SpectralCube.read(cube)
        pixel_area_units = u.Unit(cube.wcs.celestial.world_axis_units[0]) * u.Unit(
            cube.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(cube.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        cube = (cube.to(u.Jy / u.sr) * pixel_area).to(u.Jy)

        visibilities = []
        for i, frequency in enumerate(
                cube.with_spectral_unit(u.Hz, velocity_convention="radio").spectral_axis
        ):
            wl = (c.c / frequency).si
            u_wavelengths = (self.u / wl).to_value(u.dimensionless_unscaled)
            v_wavelengths = (self.v / wl).to_value(u.dimensionless_unscaled)
            vis = g_double.sampleImage(cube[i], dxy, u_wavelengths, v_wavelengths, origin='lower')
            visibilities.append(vis)
        visibilities = np.array(visibilities)
        visibilities_real_imag_weight = np.array(
            [visibilities.real, visibilities.imag, np.ones_like(visibilities.imag)]
        )
        uvdata_arr = visibilities_real_imag_weight.T.reshape(
            visibilities.shape[1], 1, 1, 1, visibilities.shape[0], 1, 3
        )
        self.set_data(uvdata_arr, cube.with_spectral_unit(u.Hz).spectral_axis)
        self.frequencies = cube.spectral_axis

    @property
    @u.quantity_input
    def visibility(self) -> u.Jy:
        """Returns complex array of visibities"""
        return self.re + self.im * 1j

    def pickle(self, filename: PathLike = None):
        """Pickle the UV coordinates so they can be reloaded later

        Args:
            filename: Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
        """
        if filename is None:
            filename = str(self.path) + ".pkl"
        else:
            if not filename.lower().endswith(".pkl"):
                self.logger.warning("If the pickle filename does not end with .pkl, "
                                    "it won't be automatically recognized")
        with open(filename, "wb") as uvpkl:
            pickle.dump((self.u, self.v), uvpkl)

    @property
    def degrees_of_freedom(self):
        return np.prod(self.re.shape)

    def chi2_with(self, data=Union[PathLike, spectral_cube.SpectralCube], threads: int = None, **kwargs) -> float:
        """Method to calculate chi-squared of a given UV set with a data cube

        Args:
            data: PathLike -- path to data cube readable by SpectralCube.read OR the spectral cube itself
            threads: int -- number of threads for galario. Sets globally until called again with non-default value.
        """
        if threads is not None:
            g_double.threads(threads)

        if not isinstance(data, spectral_cube.SpectralCube):
            data = spectral_cube.SpectralCube.read(data)
        if data.spectral_axis.unit != self.frequencies.unit:
            data = data.with_spectral_unit(self.frequencies.unit, velocity_convention="radio")
        self.logger.debug("Data spectral axis: %s", data.spectral_axis)
        self.logger.debug("UVTable spectral axis: %s", self.frequencies)
        if (data.spectral_axis.shape != self.frequencies.shape) or (
                not np.all(np.equal(data.spectral_axis, self.frequencies))):
            self.logger.info("Interpolate data to UVTable spectral grid")
            data = data.spectral_interpolate(spectral_grid=self.frequencies)

        pixel_area_units = u.Unit(data.wcs.celestial.world_axis_units[0]) \
                           * u.Unit(data.wcs.celestial.world_axis_units[1])
        pixel_area = astropy.wcs.utils.proj_plane_pixel_area(data.wcs.celestial) * pixel_area_units
        dxy = np.sqrt(pixel_area).to_value(u.rad)
        self.logger.debug("Data pixel area %s and size %s radian", pixel_area, dxy)
        data = (data.to(u.Jy / u.sr) * pixel_area).to(u.Jy)
        self.chi_per_channel = []
        for cube_slice, _wavelength, _re, _im, _weight in zip(
                data, self.wavelengths, self.re.T, self.im.T, self.weight.T
        ):
            chi_per_channel = g_double.chi2Image(
                image=cube_slice,
                dxy=dxy,
                u=(self.u / _wavelength).to_value(u.dimensionless_unscaled),
                v=(self.v / _wavelength).to_value(u.dimensionless_unscaled),
                vis_obs_re=_re.astype('float64').to_value(u.Jy),
                vis_obs_im=_im.astype('float64').to_value(u.Jy),
                vis_obs_w=_weight.astype('float64').to_value(u.Jy ** -2),
                origin='lower',
                **kwargs
            )
            self.chi_per_channel.append(chi_per_channel)
        return float(np.sum(self.chi_per_channel))

    @classmethod
    def write_visibilities_to_uvfits(
            cls,
            data_to_write: Union[PathLike, spectral_cube.SpectralCube],
            file_to_modify: PathLike,
            output_filename: PathLike = None,
            uv_kwargs: dict = None,
            residual: bool = False,
    ) -> PathLike:
        """
        Replace visibilities of `file_to_modify` to visibilities sampled from `data_to_write`,
        save into `output_filename`.

        Args:
            data_to_write: PathLike or SpectralCube, data to replace the original table
            file_to_modify: PathLike, uvfits file with visibilities and frequencies to use as a reference
            output_filename: PathLike, optional: name of new uvfits file, `file_to_modify`.modified.uvfits by default
            uv_kwargs: kwargs to be passed to UVFits initialization, ie `sum` or `channel`
            residual: if True, write input residuals `data_to_be_modified - data_to_write` instead of `data_to_write`
        Returns:
            output file name

        Usage:
            >>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
            ...     data_to_write="13CO J=2-1_image.fits",
            ...     file_to_modify="13co.uvfits",
            ...     output_filename="13co_model.uvfits",
            ...     uv_kwargs={'sum': False}
            ... )
        """
        if uv_kwargs is None:
            uv_kwargs = {}
        if output_filename is None:
            output_filename = Path(f"{file_to_modify}.modified.uvfits")

        if not isinstance(data_to_write, spectral_cube.SpectralCube):
            data_to_write = spectral_cube.SpectralCube.read(data_to_write)

        uvfits = cls(file_to_modify, **uv_kwargs)
        uvfits.image_to_visibilities(
            data_to_write.spectral_interpolate(uvfits.frequencies)
        )
        data_to_write_array = np.array(uvfits.data)

        hdul_to_modify = astropy.io.fits.open(file_to_modify, lazy_load_hdus=False, memmap=False)
        data_to_be_modified = hdul_to_modify[0].data["DATA"][:, 0, 0, 0, :, 0, :]
        if residual:
            data_to_be_modified[:, :, 0:2] -= data_to_write_array[:, :, 0:2]
        else:
            data_to_be_modified[:, :, 0:2] = data_to_write_array[:, :, 0:2]

        hdul_to_modify.writeto(output_filename, overwrite=True)

        return output_filename

    @classmethod
    def run_gildas_script(
            cls,
            script: str = "SAY HELLO WORLD",
            gildas_executable: PathLike = "imager",
            script_filename: PathLike = "last.imager",
            folder=None,
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS.

        Args:
            script: GILDAS script to run
            gildas_executable: path to GILDAS executable (imager, astro, etc.)
            script_filename: filename for script as it will be saved as a file
            folder: working directory for script

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes
        """
        if folder is None:
            folder = Path.cwd()

        script_filename = Path(script_filename)
        with open(script_filename, "w") as fff:
            fff.write(textwrap.dedent(script))

        command = f'cat "{script_filename.resolve()}" | {gildas_executable} -nw'
        logging.debug("%s$ %s", folder, command)
        proc = subprocess.run(
            command,
            capture_output=True, encoding='utf8', shell=True,
            cwd=folder
        )
        return proc

    @classmethod
    def run_imaging(
            cls,
            input_file: PathLike,
            name: str,
            imager_executable: PathLike = "imager",
            script_template: str = """
                        FITS "{input_file}" TO "{name}"
                        READ UV "{name}"
                        UV_MAP
                        CLEAN
                        LUT {lut}
                        ! VIEW CLEAN /NOPAUSE
                        LET SIZE 10
                        LET DO_CONTOUR NO
                        SHOW CLEAN
                        HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE
                """,
            script_filename: PathLike = "last.imager",
            device: Union[Literal["pdf", "png", "eps", "ps"], str] = "pdf",
            lut: str = "inferno",
            **kwargs
    ) -> subprocess.CompletedProcess:
        """
        Send script to GILDAS. The primary use is to send an imaging script to IMAGER

        Args:
            input_file: path to .uvfits file to be imaged, passed to `script_template.format
            name: basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
            imager_executable: path to imager executable, if not in system PATH
            script_template: string containing script with parameters listed in {} for .format
            script_filename: the filename for the created script
            device: DEVICE keyword for `[GTVL\]HARDCOPY`, the output figure file format,
                passed to `script_template.format
            lut: argument for `[GTVL\]LUT` for colormap setting, `inferno` by default, passed to `script_template.format
            **kwargs: other arguments passed to `script_template.format`

        Returns:
            subprocess.CompletedProcess instance with .stdout and .stderr attributes

        Usage:
            >>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
        """

        script_filename = Path(script_filename)
        input_file = Path(input_file)

        proc = cls.run_gildas_script(
            script=script_template.format(
                input_file=input_file.name,
                name=name,
                lut=lut,
                device=device,
                **kwargs
            ),
            gildas_executable=imager_executable,
            script_filename=script_filename,
            folder=input_file.parent
        )
        return proc

    @classmethod
    def run_gildas(cls, *args, **kwargs):
        """Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts."""
        warnings.warn("run_gildas is deprecated, use run_imaging instead", DeprecationWarning)
        return cls.run_imaging(*args, **kwargs)

Reads GILDAS-outputted visibilities UVFits file

Args

path
PathLike – path to the uv fits table
channel
which channel to take from the file int – single channel list – given channels slice – (a:b:c == slice(a,b,c)) – slice of channels 'all' – all channels from the file
sum
bool – calculate weigthed mean of all channels (for continuum)

Usage:

>>> uv = UVFits(
...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
...     'all', sum=True
... )
>>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
         u                   v                 Re [1]               Im [1]            Weight [1]
         m                   m                   Jy                   Jy               1 / Jy2
------------------- ------------------- ------------------- --------------------- ------------------
-0.8546872724500936 -111.88782560913619 0.08481375196790505   0.02011162435339697  4.392747296119472
  35.68536730327412   61.11044271472704 0.14395800105016973 0.0011129999808199087  4.906656253315232
  36.53999262455663  172.99830358929054 0.04616766278918842 -0.009757890697913866  4.810738961168015
 -82.12765585917153 -41.241167812120636 0.14572769842806682  0.012205666181597085 2.6732435871384994
 -81.27303053788901   70.64653734266903  0.1260857611090533 -0.018951768666146913  2.634196780195305
>>> uv = UVFits(
...     os.path.join(os.path.dirname(__file__), "..", "tests", "data", "s-Wide-1+C.uvfits"),
...     [1,2,3], sum=False
... )
>>> uv.table[0:5].pprint_all()  # doctest: +NORMALIZE_WHITESPACE
         u                   v                             Re [3]                                        Im [3]                                     Weight [3]
         m                   m                               Jy                                            Jy                                          1 / Jy2
------------------- ------------------- ------------------------------------------- ----------------------------------------------- ------------------------------------------
-0.8546872724500936 -111.88782560913619  0.08013948631211772 .. 0.07302437694644187    0.022133425091247466 .. 0.013778203363983665 0.19701689428642943 .. 0.20917844580050068
  35.68536730327412   61.11044271472704   0.14916945376238983 .. 0.1606281736906061   0.004351084873651592 .. -0.008113799327774212 0.22006599150348807 .. 0.23365029837275292
  36.53999262455663  172.99830358929054 0.02026272564921367 .. 0.051114215373805026 -0.007280084597144622 .. -0.0062913988333082845 0.21576404157411666 .. 0.22908280648299315
 -82.12765585917153 -41.241167812120636  0.13376226427333338 .. 0.18171489805253205     0.014664527234772186 .. 0.03560468549769666 0.11989631175397047 .. 0.12729731479045647
 -81.27303053788901   70.64653734266903   0.1422663387734871 .. 0.15214664685856188  -0.008367051589066395 .. -0.017851764309400768 0.11814501360102375 .. 0.12543794548908482

Class variables

var kl

Static methods

def run_gildas(*args, **kwargs)

Deprecated, renamed to run_imaging. Alsom see run_gildas_script to run arbitrary GILDAS scripts.

def run_gildas_script(script: str = 'SAY HELLO WORLD',
gildas_executable: str | os.PathLike = 'imager',
script_filename: str | os.PathLike = 'last.imager',
folder=None) ‑> subprocess.CompletedProcess

Send script to GILDAS.

Args

script
GILDAS script to run
gildas_executable
path to GILDAS executable (imager, astro, etc.)
script_filename
filename for script as it will be saved as a file
folder
working directory for script

Returns

subprocess.CompletedProcess instance with .stdout and .stderr attributes

def run_imaging(input_file: str | os.PathLike,
name: str,
imager_executable: str | os.PathLike = 'imager',
script_template: str = '\n FITS "{input_file}" TO "{name}"\n READ UV "{name}"\n UV_MAP\n CLEAN\n LUT {lut}\n ! VIEW CLEAN /NOPAUSE\n LET SIZE 10\n LET DO_CONTOUR NO\n SHOW CLEAN\n HARDCOPY "{name}.{device}" /DEVICE {device} /OVERWRITE\n ',
script_filename: str | os.PathLike = 'last.imager',
device: Literal['pdf', 'png', 'eps', 'ps'] | str = 'pdf',
lut: str = 'inferno',
**kwargs) ‑> subprocess.CompletedProcess

Send script to GILDAS. The primary use is to send an imaging script to IMAGER

Args

input_file
path to .uvfits file to be imaged, passed to `script_template.format
name
basename for the output files, passed to `script_template.format. Whitespaces are replaced with '_'
imager_executable
path to imager executable, if not in system PATH
script_template
string containing script with parameters listed in {} for .format
script_filename
the filename for the created script
device
DEVICE keyword for [GTVL\]HARDCOPY, the output figure file format, passed to `script_template.format
lut
argument for [GTVL\]LUT for colormap setting, inferno by default, passed to `script_template.format
**kwargs
other arguments passed to script_template.format

Returns

subprocess.CompletedProcess instance with .stdout and .stderr attributes

Usage

>>> UVFits.run_imaging(input_file="model.uvfits", name="model")  # doctest: +SKIP
def write_visibilities_to_uvfits(data_to_write: str | os.PathLike | spectral_cube.spectral_cube.SpectralCube,
file_to_modify: str | os.PathLike,
output_filename: str | os.PathLike = None,
uv_kwargs: dict = None,
residual: bool = False) ‑> str | os.PathLike

Replace visibilities of file_to_modify to visibilities sampled from data_to_write, save into output_filename.

Args

data_to_write
PathLike or SpectralCube, data to replace the original table
file_to_modify
PathLike, uvfits file with visibilities and frequencies to use as a reference
output_filename
PathLike, optional: name of new uvfits file, file_to_modify.modified.uvfits by default
uv_kwargs
kwargs to be passed to UVFits initialization, ie sum or channel
residual
if True, write input residuals data_to_be_modified - data_to_write instead of data_to_write

Returns

output file name

Usage

>>> UVFits.write_visibilities_to_uvfits(  # doctest: +SKIP
...     data_to_write="13CO J=2-1_image.fits",
...     file_to_modify="13co.uvfits",
...     output_filename="13co_model.uvfits",
...     uv_kwargs={'sum': False}
... )

Instance variables

prop data
Expand source code
@property
def data(self):
    return self._fits['DATA'][:, 0, 0, 0, :, 0, :]
prop degrees_of_freedom
Expand source code
@property
def degrees_of_freedom(self):
    return np.prod(self.re.shape)
prop rest_freq : Unit("Hz")
Expand source code
@property
def rest_freq(self) -> u.Hz:
    return self._fits.meta.get("RESTFREQ", None) * u.Hz
prop visibility : Unit("Jy")
Expand source code
@property
@u.quantity_input
def visibility(self) -> u.Jy:
    """Returns complex array of visibities"""
    return self.re + self.im * 1j

Returns complex array of visibities

prop wavelengths
Expand source code
@property
def wavelengths(self):
    return c.c / self.frequencies

Methods

def chi2_with(self,
data=typing.Union[str, os.PathLike, spectral_cube.spectral_cube.SpectralCube],
threads: int = None,
**kwargs) ‑> float
Expand source code
def chi2_with(self, data=Union[PathLike, spectral_cube.SpectralCube], threads: int = None, **kwargs) -> float:
    """Method to calculate chi-squared of a given UV set with a data cube

    Args:
        data: PathLike -- path to data cube readable by SpectralCube.read OR the spectral cube itself
        threads: int -- number of threads for galario. Sets globally until called again with non-default value.
    """
    if threads is not None:
        g_double.threads(threads)

    if not isinstance(data, spectral_cube.SpectralCube):
        data = spectral_cube.SpectralCube.read(data)
    if data.spectral_axis.unit != self.frequencies.unit:
        data = data.with_spectral_unit(self.frequencies.unit, velocity_convention="radio")
    self.logger.debug("Data spectral axis: %s", data.spectral_axis)
    self.logger.debug("UVTable spectral axis: %s", self.frequencies)
    if (data.spectral_axis.shape != self.frequencies.shape) or (
            not np.all(np.equal(data.spectral_axis, self.frequencies))):
        self.logger.info("Interpolate data to UVTable spectral grid")
        data = data.spectral_interpolate(spectral_grid=self.frequencies)

    pixel_area_units = u.Unit(data.wcs.celestial.world_axis_units[0]) \
                       * u.Unit(data.wcs.celestial.world_axis_units[1])
    pixel_area = astropy.wcs.utils.proj_plane_pixel_area(data.wcs.celestial) * pixel_area_units
    dxy = np.sqrt(pixel_area).to_value(u.rad)
    self.logger.debug("Data pixel area %s and size %s radian", pixel_area, dxy)
    data = (data.to(u.Jy / u.sr) * pixel_area).to(u.Jy)
    self.chi_per_channel = []
    for cube_slice, _wavelength, _re, _im, _weight in zip(
            data, self.wavelengths, self.re.T, self.im.T, self.weight.T
    ):
        chi_per_channel = g_double.chi2Image(
            image=cube_slice,
            dxy=dxy,
            u=(self.u / _wavelength).to_value(u.dimensionless_unscaled),
            v=(self.v / _wavelength).to_value(u.dimensionless_unscaled),
            vis_obs_re=_re.astype('float64').to_value(u.Jy),
            vis_obs_im=_im.astype('float64').to_value(u.Jy),
            vis_obs_w=_weight.astype('float64').to_value(u.Jy ** -2),
            origin='lower',
            **kwargs
        )
        self.chi_per_channel.append(chi_per_channel)
    return float(np.sum(self.chi_per_channel))

Method to calculate chi-squared of a given UV set with a data cube

Args

data
PathLike – path to data cube readable by SpectralCube.read OR the spectral cube itself
threads
int – number of threads for galario. Sets globally until called again with non-default value.
def image_to_visibilities(self, cube: str | os.PathLike | spectral_cube.spectral_cube.SpectralCube)
Expand source code
def image_to_visibilities(self, cube: Union[spectral_cube.SpectralCube, PathLike]):
    """Import cube from a FITS `file`, sample it with visibilities of this UVFITS"""
    if isinstance(cube, spectral_cube.SpectralCube):
        pass
    else:
        cube = spectral_cube.SpectralCube.read(cube)
    pixel_area_units = u.Unit(cube.wcs.celestial.world_axis_units[0]) * u.Unit(
        cube.wcs.celestial.world_axis_units[1])
    pixel_area = astropy.wcs.utils.proj_plane_pixel_area(cube.wcs.celestial) * pixel_area_units
    dxy = np.sqrt(pixel_area).to_value(u.rad)
    cube = (cube.to(u.Jy / u.sr) * pixel_area).to(u.Jy)

    visibilities = []
    for i, frequency in enumerate(
            cube.with_spectral_unit(u.Hz, velocity_convention="radio").spectral_axis
    ):
        wl = (c.c / frequency).si
        u_wavelengths = (self.u / wl).to_value(u.dimensionless_unscaled)
        v_wavelengths = (self.v / wl).to_value(u.dimensionless_unscaled)
        vis = g_double.sampleImage(cube[i], dxy, u_wavelengths, v_wavelengths, origin='lower')
        visibilities.append(vis)
    visibilities = np.array(visibilities)
    visibilities_real_imag_weight = np.array(
        [visibilities.real, visibilities.imag, np.ones_like(visibilities.imag)]
    )
    uvdata_arr = visibilities_real_imag_weight.T.reshape(
        visibilities.shape[1], 1, 1, 1, visibilities.shape[0], 1, 3
    )
    self.set_data(uvdata_arr, cube.with_spectral_unit(u.Hz).spectral_axis)
    self.frequencies = cube.spectral_axis

Import cube from a FITS file, sample it with visibilities of this UVFITS

def pickle(self, filename: str | os.PathLike = None)
Expand source code
def pickle(self, filename: PathLike = None):
    """Pickle the UV coordinates so they can be reloaded later

    Args:
        filename: Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
    """
    if filename is None:
        filename = str(self.path) + ".pkl"
    else:
        if not filename.lower().endswith(".pkl"):
            self.logger.warning("If the pickle filename does not end with .pkl, "
                                "it won't be automatically recognized")
    with open(filename, "wb") as uvpkl:
        pickle.dump((self.u, self.v), uvpkl)

Pickle the UV coordinates so they can be reloaded later

Args

filename
Pathlike, default: self.path + ".pkl". Should end with ".pkl"`"
def plot_total_power(self,
rest_freq: Unit("Hz") = None,
axes: matplotlib.axes._axes.Axes = None,
axes_unit: astropy.units.core.Unit | Literal['channel'] = None) ‑> matplotlib.axes._axes.Axes
Expand source code
def plot_total_power(
        self,
        rest_freq: u.Hz = None,
        axes: matplotlib.axes.Axes = None,
        axes_unit: Union[u.Unit, Literal["channel"]] = None,
) -> matplotlib.axes.Axes:
    """Plot the sum of visibilities collected by the interferometer.

    This is not actually the total power, but gives the idea for the bright lines

    Args:
        rest_freq: u.Hz - the rest frequency of the line to convert axes to km/s
        axes: matplotlib.axes.Axes - the axes to plot on
        axes_unit: u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
    """
    if rest_freq is not None:
        frequencies = self.frequencies.to(u.km / u.s, equivalencies=u.doppler_radio(rest_freq))
    else:
        frequencies = self.frequencies
    if axes_unit is not None:
        if axes_unit == "channel":
            frequencies = np.arange(len(frequencies))
        else:
            frequencies = frequencies.to(axes_unit)
    if axes is None:
        _, axes = plt.subplots(1)
    axes.plot(
        frequencies,
        np.sum(np.abs((self.visibility * self.weight) / self.weight.sum()), axis=0)
    )
    return axes

Plot the sum of visibilities collected by the interferometer.

This is not actually the total power, but gives the idea for the bright lines

Args

rest_freq
u.Hz - the rest frequency of the line to convert axes to km/s
axes
matplotlib.axes.Axes - the axes to plot on
axes_unit
u.Unit - the unit to convert the axes to, defaults to the original axes unit (likely Hz)
def plot_uv_channel_map(self,
nx: int = None,
ny: int = None,
channels: range | List[int] | Literal[...] = Ellipsis,
subplot_kw: dict = None,
gridspec_kw: dict = (('hspace', 0), ('wspace', 0)),
fig_kw: dict = None,
symsize: float = 0.5,
**kwargs) ‑> matplotlib.figure.Figure
Expand source code
def plot_uv_channel_map(
        self,
        nx: int = None, ny: int = None,
        channels: Union[range, List[int], Literal[Ellipsis]] = Ellipsis,
        subplot_kw: dict = None,
        gridspec_kw: dict = (("hspace", 0), ("wspace", 0)),
        fig_kw: dict = None,
        symsize: float = 0.5,
        **kwargs
) -> matplotlib.figure.Figure:
    gridspec_kw = dict(gridspec_kw)
    data_to_plot = self.data[:, channels, :]
    re = data_to_plot[:, :, 0]
    im = data_to_plot[:, :, 1]

    frequencies = self.frequencies[channels]
    nchannels = re.shape[1]

    if nx is None and ny is not None:
        nx = math.ceil(nchannels / ny)
    elif nx is not None and ny is None:
        ny = math.ceil(nchannels / nx)
    elif nx is not None and ny is not None:
        if nx * ny > nchannels:
            raise CHEFValueError(f"Inconsistent number of channels: nx * ny = {nx * ny} > nchannels = {nchannels}")
    else:
        ny = nx = math.ceil(math.sqrt(nchannels))

    if fig_kw is None: fig_kw = {}
    fig, axes = plt.subplots(
        ny, nx, squeeze=False, subplot_kw=subplot_kw, gridspec_kw=gridspec_kw,
        sharex="all", sharey="all",
        **fig_kw
    )
    axes.flatten()[0].invert_xaxis()
    for ax in axes.flatten(): ax.set_visible(False)
    xplot = u.Quantity([*self.u, *(-self.u)])
    yplot = u.Quantity([*self.v, *(-self.v)])

    visibility = re + 1j * im
    visibility = [*visibility, *visibility] * u.Jy
    color = np.angle(visibility)
    sizes = np.abs(visibility)
    sizes /= sizes.max() * symsize

    for vis, freq, s, col, ax in zip(visibility.T, frequencies, sizes.T, color.T, axes.flatten()):
        ax.set_visible(True)
        restwl = freq.to(u.m, equivalencies=u.spectral())
        xxplot = (xplot / restwl).to(self.kl)
        yyplot = (yplot / restwl).to(self.kl)
        ax.scatter(xxplot, yyplot, s=s, c=col, cmap="twilight", **kwargs)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    axes[ny - 1, 0].get_xaxis().set_visible(True)
    axes[ny - 1, 0].get_yaxis().set_visible(True)

    return fig
def plot_uvgrid(self,
axes: matplotlib.axes._axes.Axes = None,
kl: bool = False,
restfreq: Unit("Hz") = None,
symsize: float = 1,
**kwargs) ‑> matplotlib.axes._axes.Axes
Expand source code
def plot_uvgrid(
        self,
        axes: matplotlib.axes.Axes = None,
        kl: bool = False, restfreq: u.Hz = None,
        symsize: float = 1,
        **kwargs
) -> matplotlib.axes.Axes:
    """

    Args:
        axes: axes to draw the map on
        kl: if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
        and restfreq is not set, the mean is taken
        restfreq: u.spectral - equivalent unit, the rest frequency to use when kl is True
        kwargs: to be passed to axes.scatter

    Returns:
        axes with the uv grid plotted
    """
    if axes is None:
        _, axes = plt.subplots(1)
        axes.set_aspect('equal')

    xplot = u.Quantity([*self.u, *(-self.u)])
    yplot = u.Quantity([*self.v, *(-self.v)])

    if restfreq is None:
        restfreq = np.median(self.frequencies)
    restwl = restfreq.to(u.m, equivalencies=u.spectral())
    if kl:
        xplot = (xplot / restwl).to(self.kl)
        yplot = (yplot / restwl).to(self.kl)

    sizes = symsize * self.weight[:, 0] / (self.weight.max())
    sizes = np.array([*sizes, *sizes])
    axes.invert_xaxis()
    axes.scatter(xplot, yplot, alpha=0.5, s=sizes, **kwargs)
    return axes

Args

axes
axes to draw the map on
kl
if True, plots in dimensionless [kλ], thousands of wavelength. If more than one frequency is present,
and restfreq is not set, the mean is taken
restfreq
u.spectral - equivalent unit, the rest frequency to use when kl is True
kwargs
to be passed to axes.scatter

Returns

axes with the uv grid plotted

def set_data(self, data: numpy.ndarray, frequencies: Unit("Hz") = None)
Expand source code
@u.quantity_input
def set_data(self, data: np.ndarray, frequencies: u.Hz = None):
    """Set new visibilities data (and, optionally, frequencies) to UVFits instance"""
    if self._fits is not None:
        if self._fits["DATA"].shape != data.shape:
            if frequencies.shape != self.data.shape[1] and self.frequencies is None:
                raise CHEFValueError("Shape mismatch!")
            self.frequencies = frequencies
        self._update_fits(data, frequencies)
    else:
        self.re = data[:, 0, 0, 0, :, 0, :][:, :, 0] << u.Jy
        self.im = data[:, 0, 0, 0, :, 0, :][:, :, 1] << u.Jy
        self.weight = data[:, 0, 0, 0, :, 0, :][:, :, 2] << (u.Jy ** -2)
        self.table['Re'] = self.re
        self.table['Im'] = self.im
        self.table['Weight'] = self.weight

Set new visibilities data (and, optionally, frequencies) to UVFits instance