Module diskchef.maps.radmcrt
Expand source code
import glob
import logging
from dataclasses import dataclass, field
import os
import re
from functools import cached_property
from pathlib import Path
from typing import Union, Literal, NamedTuple
import subprocess
import platform
import chemical_names
from matplotlib.figure import Figure
from matplotlib.gridspec import GridSpec
from radio_beam import Beam
from regions import CircleSkyRegion
from scipy.optimize import curve_fit
from spectral_cube import SpectralCube
import diskchef.maps.radiation_fields
import numpy as np
from astropy import constants as c
from astropy import units as u
from astropy.coordinates import SkyCoord
from matplotlib import pyplot as plt
from matplotlib.colors import Normalize
import matplotlib.patches
import spectral_cube.utils
import radmc3dPy
from diskchef.engine.ctable import CTable
from diskchef.engine.exceptions import CHEFNotImplementedError, CHEFTypeError
from diskchef.engine.other import PathLike
from diskchef.lamda.line import Line
from diskchef.maps.base import MapBase
class Cloud(NamedTuple):
"""Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput"""
velocity_min: u.Quantity
velocity_max: u.Quantity
@dataclass
class RadMCOutput:
"""Class to store information and visualize the RadMCRT module output"""
line: Line
file_radmc: PathLike = None
file_fits: PathLike = None
object_name: str = ""
distance: u.pc = None
cloud: Cloud = None
mode: Literal["image", "image tracetau"] = "image"
def __post_init__(self):
self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__)
self.logger.info("Creating an instance of %s", self.__class__.__qualname__)
self.logger.debug("With parameters: %s", self.__dict__)
@staticmethod
def gauss(x, H, A, x0, sigma):
"""Gaussian function with a background"""
return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
@classmethod
def gauss_fit(cls, x, y):
"""Fit a gaussian into data"""
mean = sum(x * y) / sum(y)
sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y))
popt, pcov = curve_fit(cls.gauss, x, y, p0=[min(y), max(y), mean, sigma])
return popt
@classmethod
def sigma_from_data(cls, data: np.ndarray) -> float:
"""Find rms of the data by fitting a gaussian into the histogram
(assuming most of the pixels are noise)"""
values, edges = np.histogram(data, bins=100, density=True)
means = 0.5 * (edges[1:] + edges[:-1])
params = cls.gauss_fit(means, values)
# plt.step(means, values, where="mid")
# print(params)
# plt.plot(means, cls.gauss(means, *params))
return params[3]
def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor="black", **kwargs):
"""Finalize the channel or moment map figure."""
dist_pc = self.distance.to_value(u.pc)
pxsize = np.sqrt(np.abs(np.linalg.det(cube.wcs.celestial.pixel_scale_matrix)))
try:
axes.coords[0].set_major_formatter('hh:mm:ss')
axes.coords[1].set_major_formatter('dd:mm:ss')
axes.coords[0].set_ticks(spacing=5 * u.arcsec) # , format="hh:mm:ss")
axes.coords[1].set_ticks(spacing=5 * u.arcsec) # , format="dd:mm:ss")
axes.coords[0].set_ticklabel(exclude_overlapping=True)
axes.coords[1].set_ticklabel(exclude_overlapping=True)
except AttributeError:
pass
axes.scatter(
0, 0,
color='white', marker='x', zorder=100
)
if ctr_data is not None:
noise = self.sigma_from_data(ctr_data)
noise_value = getattr(noise, "value", noise)
ctr_3sigma = axes.contour(
ctr_data,
levels=[3 * noise_value],
colors='black',
linewidths=0.5,
)
else:
ctr_3sigma = None
try:
ell = matplotlib.patches.Ellipse(
(-0.8 * window, -0.8 * window),
cube.beam.major.to_value(u.arcsec) * dist_pc,
cube.beam.minor.to_value(u.arcsec) * dist_pc,
cube.beam.pa.value + 90,
hatch='///',
color=ellcolor
)
axes.add_patch(ell)
except spectral_cube.utils.NoBeamError as e:
pass
return {"ctr_3sigma": ctr_3sigma}
def _check_distance(self):
if self.distance is None:
try:
self.logger.warning(
"Distance is not set. Querying from Simbad, but it is not free. "
"Provide `distance` argument to RadMCOutput instead")
from astroquery.simbad import Simbad
simquery = Simbad()
simquery.add_votable_fields('distance', 'velocity')
self.distance = simquery.query_object(self.object_name)[0]["Distance_distance"] * u.pc
except ImportError:
self.logger.warning(
"Distance to the object was not set. astroquery is not installed. Cannot query Simbad."
)
except Exception as e:
self.logger.warning("Something went wrong querying Simbad: %s", e)
@cached_property
def unit_for_channel_map(self) -> u.Unit:
if self.mode == "image":
return u.K
elif self.mode == "image tracetau":
return u.dimensionless_unscaled
elif self.mode == "tausurf":
return u.au
else:
self.logger.error("Unknown mode %s")
return u.dimensionless_unscaled
def plot_channel_map(
self,
window=500 * u.au,
cmap: Union[matplotlib.colors.Colormap, str] = None,
velocity_offset: u.km / u.s = 0 * u.km / u.s,
nx=4, ny=4,
) -> Figure:
symnorm = False
if cmap is None:
if self.mode == "tausurf":
cmap = "coolwarm"
symnorm = True
else:
cmap = "Blues"
cube = SpectralCube.read(self.file_fits)
center = cube.wcs.celestial.pixel_to_world(cube.shape[1] / 2, cube.shape[2] / 2)
self._check_distance()
radius = (
window / self.distance
).to(
u.arcsec, equivalencies=u.dimensionless_angles()
)
region = CircleSkyRegion(center, radius)
cube.allow_huge_operations = True
cube: SpectralCube = (
cube.subcube_from_regions([region])
.with_spectral_unit(u.km / u.s, velocity_convention="radio")
)
try:
cube = cube.to(self.unit_for_channel_map)
except ValueError as e:
if cube._beam is None:
cube = cube.with_beam(Beam(1e-10 * u.arcsec)).to(self.unit_for_channel_map)
else:
raise ValueError(e)
central_channel = cube.closest_spectral_channel(velocity_offset)
# downsampled: SpectralCube = cube.downsample_axis(4, axis=0).with_spectral_unit(u.km / u.s)
downsampled = cube[central_channel - nx * ny // 2: central_channel + nx * ny // 2 + 1]
maxval = round(0.9 * downsampled.max().value, 1)
if symnorm:
norm = Normalize(-maxval, maxval)
else:
norm = Normalize(0, maxval)
aspect_ratio = downsampled.shape[2] / float(downsampled.shape[1])
fig_smallest_dim_inches = 5
gridratio = ny / float(nx) * aspect_ratio
if gridratio > 1:
ysize = fig_smallest_dim_inches * gridratio
xsize = fig_smallest_dim_inches
else:
xsize = fig_smallest_dim_inches * gridratio
ysize = fig_smallest_dim_inches
fig: Figure = plt.figure(figsize=(xsize, ysize))
gs = GridSpec(nrows=ny, ncols=nx, hspace=0.0, wspace=0.0, figure=fig)
ax = None
window, window_unit = window.value, window.unit
window_half = round(window * 0.6, ndigits=-2)
for i in range(ny):
for j in range(nx):
ax = fig.add_subplot(
gs[i, j],
sharex=ax, sharey=ax
)
im = ax.imshow(
downsampled[i * nx + j].value,
cmap=cmap,
norm=norm,
origin="lower",
extent=[-window, window, -window, window],
)
self.finalize_plot(ax, downsampled, window)
this_spectral_coordinate = downsampled.spectral_axis[i * nx + j]
ax.text(
0.5,
0.9,
f"{this_spectral_coordinate.value: .1f} {this_spectral_coordinate.unit.to_string('latex_inline')}",
horizontalalignment='center',
verticalalignment='center',
transform=ax.transAxes,
fontsize="small"
)
if self.cloud is not None:
velocity = downsampled.spectral_axis[i * nx + j]
if self.cloud.velocity_min < velocity < self.cloud.velocity_max:
ax.scatter(0.5, 0.5, marker='x', color='red', alpha=0.2, transform=ax.transAxes, s=2000)
if i != ny - 1:
plt.setp(ax.get_xticklabels(), visible=False)
else:
ax.set_xticks([-window_half, 0, window_half])
if j != 0:
plt.setp(ax.get_yticklabels(), visible=False)
else:
ax.set_yticks([-window_half, 0, window_half])
if i == ny - 1 and j == 0:
ax.set_xlabel(f"[{window_unit.to_string('latex_inline')}]")
ax.set_ylabel(f"[{window_unit.to_string('latex_inline')}]", labelpad=-18)
else:
cax = fig.add_axes([0.90, 0.1, 0.03, 0.8])
cb = fig.colorbar(im, cax=cax)
cb.ax.set_xlabel(f"\n[{downsampled.unit.to_string('latex')}]")
gs.update(left=0.1, right=0.9, top=0.9, bottom=0.1)
fig.suptitle(f"{self.object_name} {chemical_names.from_string(self.line.molecule)}")
return fig
if platform.system() == "Windows":
logging.warning(
"RadMC is not supported on Windows out of the box (GNU make...). "
"To use it, install Windows Subsystem for Linux, and install the RadMC package in $PATH. "
"radmc3dpy is also not available for Windows. Please proceed with caution."
)
RADMC_DEFAULT_EXEC = "wsl radmc3d"
else:
RADMC_DEFAULT_EXEC = "radmc3d"
@dataclass
class RadMCBase(MapBase):
executable: PathLike = RADMC_DEFAULT_EXEC
folder: PathLike = 'radmc'
radii_bins: Union[None, int] = None
theta_bins: Union[None, int] = None
outer_radius: Union[None, u.Quantity] = None
verbosity: int = 0
wavelengths: u.Quantity = field(default=np.geomspace(0.0912, 1000, 100) * u.um)
modified_random_walk: bool = True
scattering_mode_max: int = None
nphot_therm: int = None
nphot_mono: int = None
nphot_spec: int = None
coordinate: Union[str, SkyCoord] = None
external_source_type: Literal["Draine1978", "WeingartnerDraine2001", "None"] = None
external_source_multiplier: float = 1
def __post_init__(self):
super().__post_init__()
if not self.table.is_in_zr_regular_grid:
raise CHEFNotImplementedError("Grids, which are not cartesian in r, z/r are not supported.")
try:
Path(self.folder).mkdir(parents=True, exist_ok=False)
except FileExistsError:
self.logger.warn("Directory %s already exists! The results can be biased.", self.folder)
if self.radii_bins is None:
radii = np.sort(np.unique(self.table.r)).to(u.cm)
if self.outer_radius is not None:
radii = radii[radii <= self.outer_radius]
self._check_outer_radius()
elif isinstance(self.radii_bins, int):
if self.outer_radius is None:
outer_radius = self.table.r.max()
else:
outer_radius = self.outer_radius
self._check_outer_radius()
radii = np.geomspace(self.table.r.min(), outer_radius, self.radii_bins).to(u.cm)
else:
raise CHEFTypeError("radii_bins should be None or int, not %s (%s)", type(self.radii_bins), self.radii_bins)
if self.theta_bins is None:
zr = u.Quantity(np.sort(np.unique(self.table.zr)))
elif isinstance(self.theta_bins, int):
zr = u.Quantity(np.linspace(self.table.zr.min(), self.table.zr.max(), self.theta_bins))
else:
raise CHEFTypeError("theta_bins should be None or int, not %s (%s)", type(self.theta_bins), self.theta_bins)
theta = np.pi / 2 * u.rad - np.arctan(zr)
self.radii_edges = u.Quantity([radii[0], *np.sqrt(radii[1:] * radii[:-1]), radii[-1]]).value
self.zr_edges = np.array([zr[0], *(0.5 * (zr[1:] + zr[:-1])), zr[-1]])
self.theta_edges = np.sort(np.pi / 2 - np.arctan(self.zr_edges))
R, THETA = np.meshgrid(radii, theta)
self.polar_table = CTable()
self.polar_table['Distance to star'] = R.flatten()
self.polar_table['Theta'] = THETA.flatten() << u.rad
self.polar_table['Altitude'] = (np.pi / 2 * u.rad - self.polar_table['Theta']) << u.rad
self.polar_table['Height'] = self.polar_table['Distance to star'] * np.sin(self.polar_table['Altitude'])
self.polar_table['Radius'] = self.polar_table['Distance to star'] * np.cos(self.polar_table['Altitude'])
self.polar_table.sort(['Theta', 'Distance to star'])
self.polar_table['Velocity R'] = 0 * u.cm / u.s
self.polar_table['Velocity Theta'] = 0 * u.cm / u.s
self.polar_table['Velocity Phi'] = (
np.sqrt(
c.G * self.chemistry.physics.star_mass
/ self.polar_table['Distance to star']
)
* np.sin(self.polar_table['Theta'])
).to(u.cm / u.s)
self.nrcells = (len(self.radii_edges) - 1) * (len(self.theta_edges) - 1)
self.fitsfiles = []
"""List of created FITS files"""
self.outputs = {}
def _check_outer_radius(self):
pass
# total_mass = np.trapz(self.table.r * np.trapz(self.table["Gas density"], self.table.z))
# total_mass_outside_outer_radius = np.trapz(self.table.r * )
# self.logger.warning("The outer radius %s contains only %s per cent of the total disk mass", self.outer_radius)
@property
def mode(self) -> str:
"""Mode of RadMC: `mctherm`, `image`, `spectrum`, `sed`"""
raise NotImplementedError
def catch_radmc_messages(self, proc: subprocess.CompletedProcess) -> None:
"""Raises RadMC warnings and errors in `self.logger`"""
if proc.stderr: self.logger.error(proc.stderr)
self.logger.debug(proc.stdout)
for match in re.finditer(r"WARNING:(.*\n(?: .*\n){2,})", proc.stdout):
if "In the molecular data file" in match.group(1):
self.logger.info(match.group(1))
else:
self.logger.warn(match.group(1))
for match in re.finditer(r"ERROR:(.*\n(?: .*\n){2,})", proc.stdout):
self.logger.error(match.group(1))
def interpolate(self, column: str) -> None:
"""Adds a new `column` to `self.polar_table` with the data iterpolated from `self.table`"""
self.polar_table[column] = self.table.interpolate(column)(self.polar_table.r, self.polar_table.z)
def interpolate_back(self, column: str) -> None:
"""Adds a new `column` to `self.table` with the data iterpolated from `self.polar_table`"""
self.table[column] = self.polar_table.interpolate(column)(self.table.r, self.table.z)
def create_files(self) -> None:
"""Creates all the files necessary to run RadMC3D"""
self.radmc3d()
self.wavelength_micron()
self.amr_grid()
self.logger.info("Files written to %s", self.folder)
def radmc3d(self, out_file: PathLike = None) -> None:
"""Creates an empty `radmc3d.inp` file"""
if out_file is None:
out_file = os.path.join(self.folder, 'radmc3d.inp')
with open(out_file, 'w') as file:
if self.scattering_mode_max is not None:
print(f"scattering_mode_max = {self.scattering_mode_max}", file=file)
if self.modified_random_walk:
print("modified_random_walk = 1", file=file)
if self.nphot_therm is not None:
print(f"nphot_therm = {int(self.nphot_therm)}", file=file)
if self.nphot_mono is not None:
print(f"nphot_mono = {int(self.nphot_mono)}", file=file)
if self.nphot_spec is not None:
print(f"nphot_spec = {int(self.nphot_spec)}", file=file)
def wavelength_micron(self, out_file: PathLike = None) -> None:
"""Creates a `wavelength_micron.inp` file"""
if out_file is None:
out_file = os.path.join(self.folder, 'wavelength_micron.inp')
with open(out_file, 'w') as file:
print(len(self.wavelengths), file=file)
print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file)
def external_source(self, out_file: PathLike = None) -> None:
"""Creates an `external_source.inp` file"""
if self.external_source_type is None or self.external_source_type == "None":
return
elif self.external_source_type == "Draine1978":
isrf = diskchef.maps.radiation_fields.draine1978
elif self.external_source_type == "WeingartnerDraine2001":
isrf = diskchef.maps.radiation_fields.weingartner_draine_2001
else:
raise CHEFNotImplementedError("Unsupported ISRF")
if out_file is None:
out_file = os.path.join(self.folder, 'external_source.inp')
with open(out_file, 'w') as file:
print("2", file=file)
print(len(self.wavelengths), file=file)
print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file)
print(
'\n'.join(
f"{entry:.7e}"
for entry in self.external_source_multiplier * isrf(self.wavelengths).to(
u.erg / u.cm ** 2 / u.s / u.Hz / u.sr).value),
file=file)
def amr_grid(self, out_file: PathLike = None) -> None:
"""Creates a `amr_grid.inp` file"""
if not self.table.is_in_zr_regular_grid:
raise CHEFNotImplementedError
if out_file is None:
out_file = os.path.join(self.folder, 'amr_grid.inp')
with open(out_file, 'w') as file:
print('1', file=file) # Typically 1 at present
print('0', file=file) # Grid style (regular = 0)
print('100', file=file) # Spherical coordinate system
print('1' if self.verbosity else '0', file=file) # Grid info
print('1 1 0', file=file) # Included coordinates
print(len(self.radii_edges) - 1, len(self.theta_edges) - 1, 1, file=file)
print(' '.join(f"{entry:.7e}" for entry in self.radii_edges), file=file)
print(' '.join(f"{entry:.7e}" for entry in self.theta_edges), file=file)
print(0, 2 * np.pi, file=file)
@u.quantity_input
def run(
self,
inclination: u.deg = 0 * u.deg, position_angle: u.deg = 0 * u.deg,
distance: u.pc = 140 * u.pc, velocity_offset: u.km / u.s = 0 * u.km / u.s,
threads: int = 1, npix: int = 100,
) -> None:
"""Run RadMC3D after files were created with `create_files()`"""
raise CHEFNotImplementedError
@dataclass
class RadMCVisualize:
folder: PathLike = '.'
line: Line = None
def channel_map(self) -> None:
logging.warning("Deprecated")
for file in sorted(glob.glob(os.path.join(self.folder, "*image.out"))):
im = radmc3dPy.image.readImage(fname=file)
normalizer = Normalize(vmin=im.image.min(), vmax=im.image.max())
nrow = 5
ncol = 8
fig, axes = plt.subplots(
nrow, ncol,
sharey='row', sharex='col',
gridspec_kw=dict(wspace=0.0, hspace=0.0,
top=1. - 0.5 / (nrow + 1), bottom=0.5 / (nrow + 1),
left=0.5 / (ncol + 1), right=1 - 0.5 / (ncol + 1)),
figsize=(ncol + 1, nrow + 1)
)
for i, ax in enumerate(axes.flatten()):
ax.imshow(im.image[:, :, i], norm=normalizer)
axes_f = axes.flatten()
cbaxes = fig.add_axes(
[axes_f[-1].figbox.x1, axes_f[-1].figbox.y0,
0.1 * (axes_f[-1].figbox.x1 - axes_f[-1].figbox.x0),
axes_f[-1].figbox.y1 - axes_f[-1].figbox.y0]
)
plt.subplots_adjust(hspace=0.01, wspace=0.01)
cbim = axes_f[-1].images[0]
clbr = fig.colorbar(cbim, cax=cbaxes)
fig.suptitle(file)
fig.savefig(file + ".png")
Classes
class Cloud (velocity_min: astropy.units.quantity.Quantity, velocity_max: astropy.units.quantity.Quantity)
-
Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput
Expand source code
class Cloud(NamedTuple): """Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput""" velocity_min: u.Quantity velocity_max: u.Quantity
Ancestors
- builtins.tuple
Instance variables
var velocity_max : astropy.units.quantity.Quantity
-
Alias for field number 1
var velocity_min : astropy.units.quantity.Quantity
-
Alias for field number 0
class RadMCBase (chemistry: Union[ChemistryBase, PhysicsBase] = None, line_list: List[Line] = None, executable: Union[str, os.PathLike] = 'radmc3d', folder: Union[str, os.PathLike] = 'radmc', radii_bins: Optional[None] = None, theta_bins: Optional[None] = None, outer_radius: Optional[None] = None, verbosity: int = 0, wavelengths: astropy.units.quantity.Quantity = <Quantity [9.12000000e-02, 1.00185062e-01, 1.10055336e-01, 1.20898034e-01, 1.32808958e-01, 1.45893352e-01, 1.60266826e-01, 1.76056380e-01, 1.93401528e-01, 2.12455528e-01, 2.33386735e-01, 2.56380093e-01, 2.81638767e-01, 3.09385935e-01, 3.39866766e-01, 3.73350582e-01, 4.10133237e-01, 4.50539734e-01, 4.94927096e-01, 5.43687520e-01, 5.97251841e-01, 6.56093341e-01, 7.20731931e-01, 7.91738741e-01, 8.69741171e-01, 9.55428434e-01, 1.04955764e+00, 1.15296050e+00, 1.26655065e+00, 1.39133174e+00, 1.52840633e+00, 1.67898555e+00, 1.84439991e+00, 2.02611096e+00, 2.22572425e+00, 2.44500354e+00, 2.68588630e+00, 2.95050094e+00, 3.24118552e+00, 3.56050847e+00, 3.91129125e+00, 4.29663329e+00, 4.71993939e+00, 5.18494979e+00, 5.69577320e+00, 6.25692315e+00, 6.87335783e+00, 7.55052390e+00, 8.29440466e+00, 9.11157287e+00, 1.00092488e+01, 1.09953642e+01, 1.20786320e+01, 1.32686239e+01, 1.45758543e+01, 1.60118735e+01, 1.75893699e+01, 1.93222820e+01, 2.12259213e+01, 2.33171079e+01, 2.56143191e+01, 2.81378525e+01, 3.09100054e+01, 3.39552720e+01, 3.73005596e+01, 4.09754262e+01, 4.50123423e+01, 4.94469770e+01, 5.43185138e+01, 5.96699964e+01, 6.55487093e+01, 7.20065955e+01, 7.91007152e+01, 8.68937507e+01, 9.54545592e+01, 1.04858782e+02, 1.15189513e+02, 1.26538032e+02, 1.39004611e+02, 1.52699404e+02, 1.67743413e+02, 1.84269563e+02, 2.02423878e+02, 2.22366762e+02, 2.44274428e+02, 2.68340447e+02, 2.94777460e+02, 3.23819058e+02, 3.55721846e+02, 3.90767711e+02, 4.29266308e+02, 4.71557804e+02, 5.18015875e+02, 5.69051015e+02, 6.25114158e+02, 6.86700665e+02, 7.54354701e+02, 8.28674040e+02, 9.10315352e+02, 1.00000000e+03] um>, modified_random_walk: bool = True, scattering_mode_max: int = None, nphot_therm: int = None, nphot_mono: int = None, nphot_spec: int = None, coordinate: Union[str, astropy.coordinates.sky_coordinate.SkyCoord] = None, external_source_type: Literal['Draine1978', 'WeingartnerDraine2001', 'None'] = None, external_source_multiplier: float = 1)
-
RadMCBase(chemistry: Union[diskchef.chemistry.base.ChemistryBase, diskchef.physics.base.PhysicsBase] = None, line_list: List[diskchef.lamda.line.Line] = None, executable: Union[str, os.PathLike] = 'radmc3d', folder: Union[str, os.PathLike] = 'radmc', radii_bins: Optional[int] = None, theta_bins: Optional[int] = None, outer_radius: Optional[astropy.units.quantity.Quantity] = None, verbosity: int = 0, wavelengths: astropy.units.quantity.Quantity =
, modified_random_walk: bool = True, scattering_mode_max: int = None, nphot_therm: int = None, nphot_mono: int = None, nphot_spec: int = None, coordinate: Union[str, astropy.coordinates.sky_coordinate.SkyCoord] = None, external_source_type: Literal['Draine1978', 'WeingartnerDraine2001', 'None'] = None, external_source_multiplier: float = 1) Expand source code
@dataclass class RadMCBase(MapBase): executable: PathLike = RADMC_DEFAULT_EXEC folder: PathLike = 'radmc' radii_bins: Union[None, int] = None theta_bins: Union[None, int] = None outer_radius: Union[None, u.Quantity] = None verbosity: int = 0 wavelengths: u.Quantity = field(default=np.geomspace(0.0912, 1000, 100) * u.um) modified_random_walk: bool = True scattering_mode_max: int = None nphot_therm: int = None nphot_mono: int = None nphot_spec: int = None coordinate: Union[str, SkyCoord] = None external_source_type: Literal["Draine1978", "WeingartnerDraine2001", "None"] = None external_source_multiplier: float = 1 def __post_init__(self): super().__post_init__() if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError("Grids, which are not cartesian in r, z/r are not supported.") try: Path(self.folder).mkdir(parents=True, exist_ok=False) except FileExistsError: self.logger.warn("Directory %s already exists! The results can be biased.", self.folder) if self.radii_bins is None: radii = np.sort(np.unique(self.table.r)).to(u.cm) if self.outer_radius is not None: radii = radii[radii <= self.outer_radius] self._check_outer_radius() elif isinstance(self.radii_bins, int): if self.outer_radius is None: outer_radius = self.table.r.max() else: outer_radius = self.outer_radius self._check_outer_radius() radii = np.geomspace(self.table.r.min(), outer_radius, self.radii_bins).to(u.cm) else: raise CHEFTypeError("radii_bins should be None or int, not %s (%s)", type(self.radii_bins), self.radii_bins) if self.theta_bins is None: zr = u.Quantity(np.sort(np.unique(self.table.zr))) elif isinstance(self.theta_bins, int): zr = u.Quantity(np.linspace(self.table.zr.min(), self.table.zr.max(), self.theta_bins)) else: raise CHEFTypeError("theta_bins should be None or int, not %s (%s)", type(self.theta_bins), self.theta_bins) theta = np.pi / 2 * u.rad - np.arctan(zr) self.radii_edges = u.Quantity([radii[0], *np.sqrt(radii[1:] * radii[:-1]), radii[-1]]).value self.zr_edges = np.array([zr[0], *(0.5 * (zr[1:] + zr[:-1])), zr[-1]]) self.theta_edges = np.sort(np.pi / 2 - np.arctan(self.zr_edges)) R, THETA = np.meshgrid(radii, theta) self.polar_table = CTable() self.polar_table['Distance to star'] = R.flatten() self.polar_table['Theta'] = THETA.flatten() << u.rad self.polar_table['Altitude'] = (np.pi / 2 * u.rad - self.polar_table['Theta']) << u.rad self.polar_table['Height'] = self.polar_table['Distance to star'] * np.sin(self.polar_table['Altitude']) self.polar_table['Radius'] = self.polar_table['Distance to star'] * np.cos(self.polar_table['Altitude']) self.polar_table.sort(['Theta', 'Distance to star']) self.polar_table['Velocity R'] = 0 * u.cm / u.s self.polar_table['Velocity Theta'] = 0 * u.cm / u.s self.polar_table['Velocity Phi'] = ( np.sqrt( c.G * self.chemistry.physics.star_mass / self.polar_table['Distance to star'] ) * np.sin(self.polar_table['Theta']) ).to(u.cm / u.s) self.nrcells = (len(self.radii_edges) - 1) * (len(self.theta_edges) - 1) self.fitsfiles = [] """List of created FITS files""" self.outputs = {} def _check_outer_radius(self): pass # total_mass = np.trapz(self.table.r * np.trapz(self.table["Gas density"], self.table.z)) # total_mass_outside_outer_radius = np.trapz(self.table.r * ) # self.logger.warning("The outer radius %s contains only %s per cent of the total disk mass", self.outer_radius) @property def mode(self) -> str: """Mode of RadMC: `mctherm`, `image`, `spectrum`, `sed`""" raise NotImplementedError def catch_radmc_messages(self, proc: subprocess.CompletedProcess) -> None: """Raises RadMC warnings and errors in `self.logger`""" if proc.stderr: self.logger.error(proc.stderr) self.logger.debug(proc.stdout) for match in re.finditer(r"WARNING:(.*\n(?: .*\n){2,})", proc.stdout): if "In the molecular data file" in match.group(1): self.logger.info(match.group(1)) else: self.logger.warn(match.group(1)) for match in re.finditer(r"ERROR:(.*\n(?: .*\n){2,})", proc.stdout): self.logger.error(match.group(1)) def interpolate(self, column: str) -> None: """Adds a new `column` to `self.polar_table` with the data iterpolated from `self.table`""" self.polar_table[column] = self.table.interpolate(column)(self.polar_table.r, self.polar_table.z) def interpolate_back(self, column: str) -> None: """Adds a new `column` to `self.table` with the data iterpolated from `self.polar_table`""" self.table[column] = self.polar_table.interpolate(column)(self.table.r, self.table.z) def create_files(self) -> None: """Creates all the files necessary to run RadMC3D""" self.radmc3d() self.wavelength_micron() self.amr_grid() self.logger.info("Files written to %s", self.folder) def radmc3d(self, out_file: PathLike = None) -> None: """Creates an empty `radmc3d.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'radmc3d.inp') with open(out_file, 'w') as file: if self.scattering_mode_max is not None: print(f"scattering_mode_max = {self.scattering_mode_max}", file=file) if self.modified_random_walk: print("modified_random_walk = 1", file=file) if self.nphot_therm is not None: print(f"nphot_therm = {int(self.nphot_therm)}", file=file) if self.nphot_mono is not None: print(f"nphot_mono = {int(self.nphot_mono)}", file=file) if self.nphot_spec is not None: print(f"nphot_spec = {int(self.nphot_spec)}", file=file) def wavelength_micron(self, out_file: PathLike = None) -> None: """Creates a `wavelength_micron.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'wavelength_micron.inp') with open(out_file, 'w') as file: print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) def external_source(self, out_file: PathLike = None) -> None: """Creates an `external_source.inp` file""" if self.external_source_type is None or self.external_source_type == "None": return elif self.external_source_type == "Draine1978": isrf = diskchef.maps.radiation_fields.draine1978 elif self.external_source_type == "WeingartnerDraine2001": isrf = diskchef.maps.radiation_fields.weingartner_draine_2001 else: raise CHEFNotImplementedError("Unsupported ISRF") if out_file is None: out_file = os.path.join(self.folder, 'external_source.inp') with open(out_file, 'w') as file: print("2", file=file) print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) print( '\n'.join( f"{entry:.7e}" for entry in self.external_source_multiplier * isrf(self.wavelengths).to( u.erg / u.cm ** 2 / u.s / u.Hz / u.sr).value), file=file) def amr_grid(self, out_file: PathLike = None) -> None: """Creates a `amr_grid.inp` file""" if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError if out_file is None: out_file = os.path.join(self.folder, 'amr_grid.inp') with open(out_file, 'w') as file: print('1', file=file) # Typically 1 at present print('0', file=file) # Grid style (regular = 0) print('100', file=file) # Spherical coordinate system print('1' if self.verbosity else '0', file=file) # Grid info print('1 1 0', file=file) # Included coordinates print(len(self.radii_edges) - 1, len(self.theta_edges) - 1, 1, file=file) print(' '.join(f"{entry:.7e}" for entry in self.radii_edges), file=file) print(' '.join(f"{entry:.7e}" for entry in self.theta_edges), file=file) print(0, 2 * np.pi, file=file) @u.quantity_input def run( self, inclination: u.deg = 0 * u.deg, position_angle: u.deg = 0 * u.deg, distance: u.pc = 140 * u.pc, velocity_offset: u.km / u.s = 0 * u.km / u.s, threads: int = 1, npix: int = 100, ) -> None: """Run RadMC3D after files were created with `create_files()`""" raise CHEFNotImplementedError
Ancestors
Subclasses
Class variables
var coordinate : Union[str, astropy.coordinates.sky_coordinate.SkyCoord]
var executable : Union[str, os.PathLike]
var external_source_multiplier : float
var external_source_type : Literal['Draine1978', 'WeingartnerDraine2001', 'None']
var folder : Union[str, os.PathLike]
var modified_random_walk : bool
var nphot_mono : int
var nphot_spec : int
var nphot_therm : int
var outer_radius : Optional[None]
var radii_bins : Optional[None]
var scattering_mode_max : int
var theta_bins : Optional[None]
var verbosity : int
var wavelengths : astropy.units.quantity.Quantity
Instance variables
var mode : str
-
Mode of RadMC:
mctherm
,image
,spectrum
,sed
Expand source code
@property def mode(self) -> str: """Mode of RadMC: `mctherm`, `image`, `spectrum`, `sed`""" raise NotImplementedError
Methods
def amr_grid(self, out_file: Union[str, os.PathLike] = None) ‑> None
-
Creates a
amr_grid.inp
fileExpand source code
def amr_grid(self, out_file: PathLike = None) -> None: """Creates a `amr_grid.inp` file""" if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError if out_file is None: out_file = os.path.join(self.folder, 'amr_grid.inp') with open(out_file, 'w') as file: print('1', file=file) # Typically 1 at present print('0', file=file) # Grid style (regular = 0) print('100', file=file) # Spherical coordinate system print('1' if self.verbosity else '0', file=file) # Grid info print('1 1 0', file=file) # Included coordinates print(len(self.radii_edges) - 1, len(self.theta_edges) - 1, 1, file=file) print(' '.join(f"{entry:.7e}" for entry in self.radii_edges), file=file) print(' '.join(f"{entry:.7e}" for entry in self.theta_edges), file=file) print(0, 2 * np.pi, file=file)
def catch_radmc_messages(self, proc: subprocess.CompletedProcess) ‑> None
-
Raises RadMC warnings and errors in
self.logger
Expand source code
def catch_radmc_messages(self, proc: subprocess.CompletedProcess) -> None: """Raises RadMC warnings and errors in `self.logger`""" if proc.stderr: self.logger.error(proc.stderr) self.logger.debug(proc.stdout) for match in re.finditer(r"WARNING:(.*\n(?: .*\n){2,})", proc.stdout): if "In the molecular data file" in match.group(1): self.logger.info(match.group(1)) else: self.logger.warn(match.group(1)) for match in re.finditer(r"ERROR:(.*\n(?: .*\n){2,})", proc.stdout): self.logger.error(match.group(1))
def create_files(self) ‑> None
-
Creates all the files necessary to run RadMC3D
Expand source code
def create_files(self) -> None: """Creates all the files necessary to run RadMC3D""" self.radmc3d() self.wavelength_micron() self.amr_grid() self.logger.info("Files written to %s", self.folder)
def external_source(self, out_file: Union[str, os.PathLike] = None) ‑> None
-
Creates an
external_source.inp
fileExpand source code
def external_source(self, out_file: PathLike = None) -> None: """Creates an `external_source.inp` file""" if self.external_source_type is None or self.external_source_type == "None": return elif self.external_source_type == "Draine1978": isrf = diskchef.maps.radiation_fields.draine1978 elif self.external_source_type == "WeingartnerDraine2001": isrf = diskchef.maps.radiation_fields.weingartner_draine_2001 else: raise CHEFNotImplementedError("Unsupported ISRF") if out_file is None: out_file = os.path.join(self.folder, 'external_source.inp') with open(out_file, 'w') as file: print("2", file=file) print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) print( '\n'.join( f"{entry:.7e}" for entry in self.external_source_multiplier * isrf(self.wavelengths).to( u.erg / u.cm ** 2 / u.s / u.Hz / u.sr).value), file=file)
def interpolate(self, column: str) ‑> None
-
Adds a new
column
toself.polar_table
with the data iterpolated fromself.table
Expand source code
def interpolate(self, column: str) -> None: """Adds a new `column` to `self.polar_table` with the data iterpolated from `self.table`""" self.polar_table[column] = self.table.interpolate(column)(self.polar_table.r, self.polar_table.z)
def interpolate_back(self, column: str) ‑> None
-
Adds a new
column
toself.table
with the data iterpolated fromself.polar_table
Expand source code
def interpolate_back(self, column: str) -> None: """Adds a new `column` to `self.table` with the data iterpolated from `self.polar_table`""" self.table[column] = self.polar_table.interpolate(column)(self.table.r, self.table.z)
def radmc3d(self, out_file: Union[str, os.PathLike] = None) ‑> None
-
Creates an empty
radmc3d.inp
fileExpand source code
def radmc3d(self, out_file: PathLike = None) -> None: """Creates an empty `radmc3d.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'radmc3d.inp') with open(out_file, 'w') as file: if self.scattering_mode_max is not None: print(f"scattering_mode_max = {self.scattering_mode_max}", file=file) if self.modified_random_walk: print("modified_random_walk = 1", file=file) if self.nphot_therm is not None: print(f"nphot_therm = {int(self.nphot_therm)}", file=file) if self.nphot_mono is not None: print(f"nphot_mono = {int(self.nphot_mono)}", file=file) if self.nphot_spec is not None: print(f"nphot_spec = {int(self.nphot_spec)}", file=file)
def run(self, inclination: Unit("deg") = <Quantity 0. deg>, position_angle: Unit("deg") = <Quantity 0. deg>, distance: Unit("pc") = <Quantity 140. pc>, velocity_offset: Unit("km / s") = <Quantity 0. km / s>, threads: int = 1, npix: int = 100) ‑> None
-
Run RadMC3D after files were created with
create_files()
Expand source code
@u.quantity_input def run( self, inclination: u.deg = 0 * u.deg, position_angle: u.deg = 0 * u.deg, distance: u.pc = 140 * u.pc, velocity_offset: u.km / u.s = 0 * u.km / u.s, threads: int = 1, npix: int = 100, ) -> None: """Run RadMC3D after files were created with `create_files()`""" raise CHEFNotImplementedError
def wavelength_micron(self, out_file: Union[str, os.PathLike] = None) ‑> None
-
Creates a
wavelength_micron.inp
fileExpand source code
def wavelength_micron(self, out_file: PathLike = None) -> None: """Creates a `wavelength_micron.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'wavelength_micron.inp') with open(out_file, 'w') as file: print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file)
class RadMCOutput (line: Line, file_radmc: Union[str, os.PathLike] = None, file_fits: Union[str, os.PathLike] = None, object_name: str = '', distance: Unit("pc") = None, cloud: Cloud = None, mode: Literal['image', 'image tracetau'] = 'image')
-
Class to store information and visualize the RadMCRT module output
Expand source code
@dataclass class RadMCOutput: """Class to store information and visualize the RadMCRT module output""" line: Line file_radmc: PathLike = None file_fits: PathLike = None object_name: str = "" distance: u.pc = None cloud: Cloud = None mode: Literal["image", "image tracetau"] = "image" def __post_init__(self): self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__) self.logger.info("Creating an instance of %s", self.__class__.__qualname__) self.logger.debug("With parameters: %s", self.__dict__) @staticmethod def gauss(x, H, A, x0, sigma): """Gaussian function with a background""" return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2)) @classmethod def gauss_fit(cls, x, y): """Fit a gaussian into data""" mean = sum(x * y) / sum(y) sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y)) popt, pcov = curve_fit(cls.gauss, x, y, p0=[min(y), max(y), mean, sigma]) return popt @classmethod def sigma_from_data(cls, data: np.ndarray) -> float: """Find rms of the data by fitting a gaussian into the histogram (assuming most of the pixels are noise)""" values, edges = np.histogram(data, bins=100, density=True) means = 0.5 * (edges[1:] + edges[:-1]) params = cls.gauss_fit(means, values) # plt.step(means, values, where="mid") # print(params) # plt.plot(means, cls.gauss(means, *params)) return params[3] def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor="black", **kwargs): """Finalize the channel or moment map figure.""" dist_pc = self.distance.to_value(u.pc) pxsize = np.sqrt(np.abs(np.linalg.det(cube.wcs.celestial.pixel_scale_matrix))) try: axes.coords[0].set_major_formatter('hh:mm:ss') axes.coords[1].set_major_formatter('dd:mm:ss') axes.coords[0].set_ticks(spacing=5 * u.arcsec) # , format="hh:mm:ss") axes.coords[1].set_ticks(spacing=5 * u.arcsec) # , format="dd:mm:ss") axes.coords[0].set_ticklabel(exclude_overlapping=True) axes.coords[1].set_ticklabel(exclude_overlapping=True) except AttributeError: pass axes.scatter( 0, 0, color='white', marker='x', zorder=100 ) if ctr_data is not None: noise = self.sigma_from_data(ctr_data) noise_value = getattr(noise, "value", noise) ctr_3sigma = axes.contour( ctr_data, levels=[3 * noise_value], colors='black', linewidths=0.5, ) else: ctr_3sigma = None try: ell = matplotlib.patches.Ellipse( (-0.8 * window, -0.8 * window), cube.beam.major.to_value(u.arcsec) * dist_pc, cube.beam.minor.to_value(u.arcsec) * dist_pc, cube.beam.pa.value + 90, hatch='///', color=ellcolor ) axes.add_patch(ell) except spectral_cube.utils.NoBeamError as e: pass return {"ctr_3sigma": ctr_3sigma} def _check_distance(self): if self.distance is None: try: self.logger.warning( "Distance is not set. Querying from Simbad, but it is not free. " "Provide `distance` argument to RadMCOutput instead") from astroquery.simbad import Simbad simquery = Simbad() simquery.add_votable_fields('distance', 'velocity') self.distance = simquery.query_object(self.object_name)[0]["Distance_distance"] * u.pc except ImportError: self.logger.warning( "Distance to the object was not set. astroquery is not installed. Cannot query Simbad." ) except Exception as e: self.logger.warning("Something went wrong querying Simbad: %s", e) @cached_property def unit_for_channel_map(self) -> u.Unit: if self.mode == "image": return u.K elif self.mode == "image tracetau": return u.dimensionless_unscaled elif self.mode == "tausurf": return u.au else: self.logger.error("Unknown mode %s") return u.dimensionless_unscaled def plot_channel_map( self, window=500 * u.au, cmap: Union[matplotlib.colors.Colormap, str] = None, velocity_offset: u.km / u.s = 0 * u.km / u.s, nx=4, ny=4, ) -> Figure: symnorm = False if cmap is None: if self.mode == "tausurf": cmap = "coolwarm" symnorm = True else: cmap = "Blues" cube = SpectralCube.read(self.file_fits) center = cube.wcs.celestial.pixel_to_world(cube.shape[1] / 2, cube.shape[2] / 2) self._check_distance() radius = ( window / self.distance ).to( u.arcsec, equivalencies=u.dimensionless_angles() ) region = CircleSkyRegion(center, radius) cube.allow_huge_operations = True cube: SpectralCube = ( cube.subcube_from_regions([region]) .with_spectral_unit(u.km / u.s, velocity_convention="radio") ) try: cube = cube.to(self.unit_for_channel_map) except ValueError as e: if cube._beam is None: cube = cube.with_beam(Beam(1e-10 * u.arcsec)).to(self.unit_for_channel_map) else: raise ValueError(e) central_channel = cube.closest_spectral_channel(velocity_offset) # downsampled: SpectralCube = cube.downsample_axis(4, axis=0).with_spectral_unit(u.km / u.s) downsampled = cube[central_channel - nx * ny // 2: central_channel + nx * ny // 2 + 1] maxval = round(0.9 * downsampled.max().value, 1) if symnorm: norm = Normalize(-maxval, maxval) else: norm = Normalize(0, maxval) aspect_ratio = downsampled.shape[2] / float(downsampled.shape[1]) fig_smallest_dim_inches = 5 gridratio = ny / float(nx) * aspect_ratio if gridratio > 1: ysize = fig_smallest_dim_inches * gridratio xsize = fig_smallest_dim_inches else: xsize = fig_smallest_dim_inches * gridratio ysize = fig_smallest_dim_inches fig: Figure = plt.figure(figsize=(xsize, ysize)) gs = GridSpec(nrows=ny, ncols=nx, hspace=0.0, wspace=0.0, figure=fig) ax = None window, window_unit = window.value, window.unit window_half = round(window * 0.6, ndigits=-2) for i in range(ny): for j in range(nx): ax = fig.add_subplot( gs[i, j], sharex=ax, sharey=ax ) im = ax.imshow( downsampled[i * nx + j].value, cmap=cmap, norm=norm, origin="lower", extent=[-window, window, -window, window], ) self.finalize_plot(ax, downsampled, window) this_spectral_coordinate = downsampled.spectral_axis[i * nx + j] ax.text( 0.5, 0.9, f"{this_spectral_coordinate.value: .1f} {this_spectral_coordinate.unit.to_string('latex_inline')}", horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize="small" ) if self.cloud is not None: velocity = downsampled.spectral_axis[i * nx + j] if self.cloud.velocity_min < velocity < self.cloud.velocity_max: ax.scatter(0.5, 0.5, marker='x', color='red', alpha=0.2, transform=ax.transAxes, s=2000) if i != ny - 1: plt.setp(ax.get_xticklabels(), visible=False) else: ax.set_xticks([-window_half, 0, window_half]) if j != 0: plt.setp(ax.get_yticklabels(), visible=False) else: ax.set_yticks([-window_half, 0, window_half]) if i == ny - 1 and j == 0: ax.set_xlabel(f"[{window_unit.to_string('latex_inline')}]") ax.set_ylabel(f"[{window_unit.to_string('latex_inline')}]", labelpad=-18) else: cax = fig.add_axes([0.90, 0.1, 0.03, 0.8]) cb = fig.colorbar(im, cax=cax) cb.ax.set_xlabel(f"\n[{downsampled.unit.to_string('latex')}]") gs.update(left=0.1, right=0.9, top=0.9, bottom=0.1) fig.suptitle(f"{self.object_name} {chemical_names.from_string(self.line.molecule)}") return fig
Class variables
var cloud : Cloud
var distance : Unit("pc")
var file_fits : Union[str, os.PathLike]
var file_radmc : Union[str, os.PathLike]
var line : Line
var mode : Literal['image', 'image tracetau']
var object_name : str
Static methods
def gauss(x, H, A, x0, sigma)
-
Gaussian function with a background
Expand source code
@staticmethod def gauss(x, H, A, x0, sigma): """Gaussian function with a background""" return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
def gauss_fit(x, y)
-
Fit a gaussian into data
Expand source code
@classmethod def gauss_fit(cls, x, y): """Fit a gaussian into data""" mean = sum(x * y) / sum(y) sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y)) popt, pcov = curve_fit(cls.gauss, x, y, p0=[min(y), max(y), mean, sigma]) return popt
def sigma_from_data(data: numpy.ndarray) ‑> float
-
Find rms of the data by fitting a gaussian into the histogram (assuming most of the pixels are noise)
Expand source code
@classmethod def sigma_from_data(cls, data: np.ndarray) -> float: """Find rms of the data by fitting a gaussian into the histogram (assuming most of the pixels are noise)""" values, edges = np.histogram(data, bins=100, density=True) means = 0.5 * (edges[1:] + edges[:-1]) params = cls.gauss_fit(means, values) # plt.step(means, values, where="mid") # print(params) # plt.plot(means, cls.gauss(means, *params)) return params[3]
Instance variables
var unit_for_channel_map
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
Methods
def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor='black', **kwargs)
-
Finalize the channel or moment map figure.
Expand source code
def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor="black", **kwargs): """Finalize the channel or moment map figure.""" dist_pc = self.distance.to_value(u.pc) pxsize = np.sqrt(np.abs(np.linalg.det(cube.wcs.celestial.pixel_scale_matrix))) try: axes.coords[0].set_major_formatter('hh:mm:ss') axes.coords[1].set_major_formatter('dd:mm:ss') axes.coords[0].set_ticks(spacing=5 * u.arcsec) # , format="hh:mm:ss") axes.coords[1].set_ticks(spacing=5 * u.arcsec) # , format="dd:mm:ss") axes.coords[0].set_ticklabel(exclude_overlapping=True) axes.coords[1].set_ticklabel(exclude_overlapping=True) except AttributeError: pass axes.scatter( 0, 0, color='white', marker='x', zorder=100 ) if ctr_data is not None: noise = self.sigma_from_data(ctr_data) noise_value = getattr(noise, "value", noise) ctr_3sigma = axes.contour( ctr_data, levels=[3 * noise_value], colors='black', linewidths=0.5, ) else: ctr_3sigma = None try: ell = matplotlib.patches.Ellipse( (-0.8 * window, -0.8 * window), cube.beam.major.to_value(u.arcsec) * dist_pc, cube.beam.minor.to_value(u.arcsec) * dist_pc, cube.beam.pa.value + 90, hatch='///', color=ellcolor ) axes.add_patch(ell) except spectral_cube.utils.NoBeamError as e: pass return {"ctr_3sigma": ctr_3sigma}
def plot_channel_map(self, window=<Quantity 500. AU>, cmap: Union[matplotlib.colors.Colormap, str] = None, velocity_offset: Unit("km / s") = <Quantity 0. km / s>, nx=4, ny=4) ‑> matplotlib.figure.Figure
-
Expand source code
def plot_channel_map( self, window=500 * u.au, cmap: Union[matplotlib.colors.Colormap, str] = None, velocity_offset: u.km / u.s = 0 * u.km / u.s, nx=4, ny=4, ) -> Figure: symnorm = False if cmap is None: if self.mode == "tausurf": cmap = "coolwarm" symnorm = True else: cmap = "Blues" cube = SpectralCube.read(self.file_fits) center = cube.wcs.celestial.pixel_to_world(cube.shape[1] / 2, cube.shape[2] / 2) self._check_distance() radius = ( window / self.distance ).to( u.arcsec, equivalencies=u.dimensionless_angles() ) region = CircleSkyRegion(center, radius) cube.allow_huge_operations = True cube: SpectralCube = ( cube.subcube_from_regions([region]) .with_spectral_unit(u.km / u.s, velocity_convention="radio") ) try: cube = cube.to(self.unit_for_channel_map) except ValueError as e: if cube._beam is None: cube = cube.with_beam(Beam(1e-10 * u.arcsec)).to(self.unit_for_channel_map) else: raise ValueError(e) central_channel = cube.closest_spectral_channel(velocity_offset) # downsampled: SpectralCube = cube.downsample_axis(4, axis=0).with_spectral_unit(u.km / u.s) downsampled = cube[central_channel - nx * ny // 2: central_channel + nx * ny // 2 + 1] maxval = round(0.9 * downsampled.max().value, 1) if symnorm: norm = Normalize(-maxval, maxval) else: norm = Normalize(0, maxval) aspect_ratio = downsampled.shape[2] / float(downsampled.shape[1]) fig_smallest_dim_inches = 5 gridratio = ny / float(nx) * aspect_ratio if gridratio > 1: ysize = fig_smallest_dim_inches * gridratio xsize = fig_smallest_dim_inches else: xsize = fig_smallest_dim_inches * gridratio ysize = fig_smallest_dim_inches fig: Figure = plt.figure(figsize=(xsize, ysize)) gs = GridSpec(nrows=ny, ncols=nx, hspace=0.0, wspace=0.0, figure=fig) ax = None window, window_unit = window.value, window.unit window_half = round(window * 0.6, ndigits=-2) for i in range(ny): for j in range(nx): ax = fig.add_subplot( gs[i, j], sharex=ax, sharey=ax ) im = ax.imshow( downsampled[i * nx + j].value, cmap=cmap, norm=norm, origin="lower", extent=[-window, window, -window, window], ) self.finalize_plot(ax, downsampled, window) this_spectral_coordinate = downsampled.spectral_axis[i * nx + j] ax.text( 0.5, 0.9, f"{this_spectral_coordinate.value: .1f} {this_spectral_coordinate.unit.to_string('latex_inline')}", horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize="small" ) if self.cloud is not None: velocity = downsampled.spectral_axis[i * nx + j] if self.cloud.velocity_min < velocity < self.cloud.velocity_max: ax.scatter(0.5, 0.5, marker='x', color='red', alpha=0.2, transform=ax.transAxes, s=2000) if i != ny - 1: plt.setp(ax.get_xticklabels(), visible=False) else: ax.set_xticks([-window_half, 0, window_half]) if j != 0: plt.setp(ax.get_yticklabels(), visible=False) else: ax.set_yticks([-window_half, 0, window_half]) if i == ny - 1 and j == 0: ax.set_xlabel(f"[{window_unit.to_string('latex_inline')}]") ax.set_ylabel(f"[{window_unit.to_string('latex_inline')}]", labelpad=-18) else: cax = fig.add_axes([0.90, 0.1, 0.03, 0.8]) cb = fig.colorbar(im, cax=cax) cb.ax.set_xlabel(f"\n[{downsampled.unit.to_string('latex')}]") gs.update(left=0.1, right=0.9, top=0.9, bottom=0.1) fig.suptitle(f"{self.object_name} {chemical_names.from_string(self.line.molecule)}") return fig
class RadMCVisualize (folder: Union[str, os.PathLike] = '.', line: Line = None)
-
RadMCVisualize(folder: Union[str, os.PathLike] = '.', line: diskchef.lamda.line.Line = None)
Expand source code
@dataclass class RadMCVisualize: folder: PathLike = '.' line: Line = None def channel_map(self) -> None: logging.warning("Deprecated") for file in sorted(glob.glob(os.path.join(self.folder, "*image.out"))): im = radmc3dPy.image.readImage(fname=file) normalizer = Normalize(vmin=im.image.min(), vmax=im.image.max()) nrow = 5 ncol = 8 fig, axes = plt.subplots( nrow, ncol, sharey='row', sharex='col', gridspec_kw=dict(wspace=0.0, hspace=0.0, top=1. - 0.5 / (nrow + 1), bottom=0.5 / (nrow + 1), left=0.5 / (ncol + 1), right=1 - 0.5 / (ncol + 1)), figsize=(ncol + 1, nrow + 1) ) for i, ax in enumerate(axes.flatten()): ax.imshow(im.image[:, :, i], norm=normalizer) axes_f = axes.flatten() cbaxes = fig.add_axes( [axes_f[-1].figbox.x1, axes_f[-1].figbox.y0, 0.1 * (axes_f[-1].figbox.x1 - axes_f[-1].figbox.x0), axes_f[-1].figbox.y1 - axes_f[-1].figbox.y0] ) plt.subplots_adjust(hspace=0.01, wspace=0.01) cbim = axes_f[-1].images[0] clbr = fig.colorbar(cbim, cax=cbaxes) fig.suptitle(file) fig.savefig(file + ".png")
Class variables
var folder : Union[str, os.PathLike]
var line : Line
Methods
def channel_map(self) ‑> None
-
Expand source code
def channel_map(self) -> None: logging.warning("Deprecated") for file in sorted(glob.glob(os.path.join(self.folder, "*image.out"))): im = radmc3dPy.image.readImage(fname=file) normalizer = Normalize(vmin=im.image.min(), vmax=im.image.max()) nrow = 5 ncol = 8 fig, axes = plt.subplots( nrow, ncol, sharey='row', sharex='col', gridspec_kw=dict(wspace=0.0, hspace=0.0, top=1. - 0.5 / (nrow + 1), bottom=0.5 / (nrow + 1), left=0.5 / (ncol + 1), right=1 - 0.5 / (ncol + 1)), figsize=(ncol + 1, nrow + 1) ) for i, ax in enumerate(axes.flatten()): ax.imshow(im.image[:, :, i], norm=normalizer) axes_f = axes.flatten() cbaxes = fig.add_axes( [axes_f[-1].figbox.x1, axes_f[-1].figbox.y0, 0.1 * (axes_f[-1].figbox.x1 - axes_f[-1].figbox.x0), axes_f[-1].figbox.y1 - axes_f[-1].figbox.y0] ) plt.subplots_adjust(hspace=0.01, wspace=0.01) cbim = axes_f[-1].images[0] clbr = fig.colorbar(cbim, cax=cbaxes) fig.suptitle(file) fig.savefig(file + ".png")