Module diskchef.maps.radmcrt
Classes
class Cloud (velocity_min: astropy.units.quantity.Quantity,
velocity_max: astropy.units.quantity.Quantity)-
Expand source code
class Cloud(NamedTuple): """Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput""" velocity_min: u.Quantity velocity_max: u.Quantity
Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput
Ancestors
- builtins.tuple
Instance variables
var velocity_max : astropy.units.quantity.Quantity
-
Expand source code
class Cloud(NamedTuple): """Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput""" velocity_min: u.Quantity velocity_max: u.Quantity
Alias for field number 1
var velocity_min : astropy.units.quantity.Quantity
-
Expand source code
class Cloud(NamedTuple): """Tuple to store the foreground and background to mark on channel maps. Used in RadMCOutput""" velocity_min: u.Quantity velocity_max: u.Quantity
Alias for field number 0
class RadMCBase (chemistry: ChemistryBase | PhysicsBase = None,
line_list: List[Line] = None,
executable: str | os.PathLike = 'radmc3d',
folder: str | os.PathLike = 'radmc',
radii_bins: int | None = None,
theta_bins: int | None = None,
outer_radius: astropy.units.quantity.Quantity | None = None,
verbosity: int = 0,
wavelengths: astropy.units.quantity.Quantity = <Quantity [9.12000000e-02, 1.00185062e-01, 1.10055336e-01, 1.20898034e-01, 1.32808958e-01, 1.45893352e-01, 1.60266826e-01, 1.76056380e-01, 1.93401528e-01, 2.12455528e-01, 2.33386735e-01, 2.56380093e-01, 2.81638767e-01, 3.09385935e-01, 3.39866766e-01, 3.73350582e-01, 4.10133237e-01, 4.50539734e-01, 4.94927096e-01, 5.43687520e-01, 5.97251841e-01, 6.56093341e-01, 7.20731931e-01, 7.91738741e-01, 8.69741171e-01, 9.55428434e-01, 1.04955764e+00, 1.15296050e+00, 1.26655065e+00, 1.39133174e+00, 1.52840633e+00, 1.67898555e+00, 1.84439991e+00, 2.02611096e+00, 2.22572425e+00, 2.44500354e+00, 2.68588630e+00, 2.95050094e+00, 3.24118552e+00, 3.56050847e+00, 3.91129125e+00, 4.29663329e+00, 4.71993939e+00, 5.18494979e+00, 5.69577320e+00, 6.25692315e+00, 6.87335783e+00, 7.55052390e+00, 8.29440466e+00, 9.11157287e+00, 1.00092488e+01, 1.09953642e+01, 1.20786320e+01, 1.32686239e+01, 1.45758543e+01, 1.60118735e+01, 1.75893699e+01, 1.93222820e+01, 2.12259213e+01, 2.33171079e+01, 2.56143191e+01, 2.81378525e+01, 3.09100054e+01, 3.39552720e+01, 3.73005596e+01, 4.09754262e+01, 4.50123423e+01, 4.94469770e+01, 5.43185138e+01, 5.96699964e+01, 6.55487093e+01, 7.20065955e+01, 7.91007152e+01, 8.68937507e+01, 9.54545592e+01, 1.04858782e+02, 1.15189513e+02, 1.26538032e+02, 1.39004611e+02, 1.52699404e+02, 1.67743413e+02, 1.84269563e+02, 2.02423878e+02, 2.22366762e+02, 2.44274428e+02, 2.68340447e+02, 2.94777460e+02, 3.23819058e+02, 3.55721846e+02, 3.90767711e+02, 4.29266308e+02, 4.71557804e+02, 5.18015875e+02, 5.69051015e+02, 6.25114158e+02, 6.86700665e+02, 7.54354701e+02, 8.28674040e+02, 9.10315352e+02, 1.00000000e+03] um>,
modified_random_walk: bool = True,
scattering_mode_max: int = None,
nphot_therm: int = None,
nphot_mono: int = None,
nphot_spec: int = None,
coordinate: str | astropy.coordinates.sky_coordinate.SkyCoord = None,
external_source_type: Literal['Draine1978', 'WeingartnerDraine2001', 'None'] = None,
external_source_multiplier: float = 1)-
Expand source code
@dataclass class RadMCBase(MapBase): executable: PathLike = RADMC_DEFAULT_EXEC folder: PathLike = 'radmc' radii_bins: Union[None, int] = None theta_bins: Union[None, int] = None outer_radius: Union[None, u.Quantity] = None verbosity: int = 0 wavelengths: u.Quantity = field(default=np.geomspace(0.0912, 1000, 100) * u.um) modified_random_walk: bool = True scattering_mode_max: int = None nphot_therm: int = None nphot_mono: int = None nphot_spec: int = None coordinate: Union[str, SkyCoord] = None external_source_type: Literal["Draine1978", "WeingartnerDraine2001", "None"] = None external_source_multiplier: float = 1 def __post_init__(self): super().__post_init__() if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError("Grids, which are not cartesian in r, z/r are not supported.") try: Path(self.folder).mkdir(parents=True, exist_ok=False) except FileExistsError: self.logger.warn("Directory %s already exists! The results can be biased.", self.folder) if self.radii_bins is None: radii = np.sort(np.unique(self.table.r)).to(u.cm) if self.outer_radius is not None: radii = radii[radii <= self.outer_radius] self._check_outer_radius() elif isinstance(self.radii_bins, int): if self.outer_radius is None: outer_radius = self.table.r.max() else: outer_radius = self.outer_radius self._check_outer_radius() radii = np.geomspace(self.table.r.min(), outer_radius, self.radii_bins).to(u.cm) else: raise CHEFTypeError("radii_bins should be None or int, not %s (%s)", type(self.radii_bins), self.radii_bins) if self.theta_bins is None: zr = u.Quantity(np.sort(np.unique(self.table.zr))) elif isinstance(self.theta_bins, int): zr = u.Quantity(np.linspace(self.table.zr.min(), self.table.zr.max(), self.theta_bins)) else: raise CHEFTypeError("theta_bins should be None or int, not %s (%s)", type(self.theta_bins), self.theta_bins) theta = np.pi / 2 * u.rad - np.arctan(zr) self.radii_edges = u.Quantity([radii[0], *np.sqrt(radii[1:] * radii[:-1]), radii[-1]]).value self.zr_edges = np.array([zr[0], *(0.5 * (zr[1:] + zr[:-1])), zr[-1]]) self.theta_edges = np.sort(np.pi / 2 - np.arctan(self.zr_edges)) R, THETA = np.meshgrid(radii, theta) self.polar_table = CTable() self.polar_table['Distance to star'] = R.flatten() self.polar_table['Theta'] = THETA.flatten() << u.rad self.polar_table['Altitude'] = (np.pi / 2 * u.rad - self.polar_table['Theta']) << u.rad self.polar_table['Height'] = self.polar_table['Distance to star'] * np.sin(self.polar_table['Altitude']) self.polar_table['Radius'] = self.polar_table['Distance to star'] * np.cos(self.polar_table['Altitude']) self.polar_table.sort(['Theta', 'Distance to star']) self.polar_table['Velocity R'] = 0 * u.cm / u.s self.polar_table['Velocity Theta'] = 0 * u.cm / u.s self.polar_table['Velocity Phi'] = ( np.sqrt( c.G * self.chemistry.physics.star_mass / self.polar_table['Distance to star'] ) * np.sin(self.polar_table['Theta']) ).to(u.cm / u.s) self.nrcells = (len(self.radii_edges) - 1) * (len(self.theta_edges) - 1) self.fitsfiles = [] """List of created FITS files""" self.outputs = {} def _check_outer_radius(self): pass # total_mass = np.trapz(self.table.r * np.trapz(self.table["Gas density"], self.table.z)) # total_mass_outside_outer_radius = np.trapz(self.table.r * ) # self.logger.warning("The outer radius %s contains only %s per cent of the total disk mass", self.outer_radius) @property def mode(self) -> str: """Mode of RadMC: `mctherm`, `image`, `spectrum`, `sed`""" raise NotImplementedError def catch_radmc_messages(self, proc: subprocess.CompletedProcess) -> None: """Raises RadMC warnings and errors in `self.logger`""" if proc.stderr: self.logger.error(proc.stderr) self.logger.debug(proc.stdout) for match in re.finditer(r"WARNING:(.*\n(?: .*\n){2,})", proc.stdout): if "In the molecular data file" in match.group(1): self.logger.info(match.group(1)) else: self.logger.warn(match.group(1)) for match in re.finditer(r"ERROR:(.*\n(?: .*\n){2,})", proc.stdout): self.logger.error(match.group(1)) def interpolate(self, column: str) -> None: """Adds a new `column` to `self.polar_table` with the data iterpolated from `self.table`""" self.polar_table[column] = self.table.interpolate(column)(self.polar_table.r, self.polar_table.z) def interpolate_back(self, column: str) -> None: """Adds a new `column` to `self.table` with the data iterpolated from `self.polar_table`""" self.table[column] = self.polar_table.interpolate(column)(self.table.r, self.table.z) def create_files(self) -> None: """Creates all the files necessary to run RadMC3D""" self.radmc3d() self.wavelength_micron() self.amr_grid() self.logger.info("Files written to %s", self.folder) def radmc3d(self, out_file: PathLike = None) -> None: """Creates an empty `radmc3d.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'radmc3d.inp') with open(out_file, 'w') as file: if self.scattering_mode_max is not None: print(f"scattering_mode_max = {self.scattering_mode_max}", file=file) if self.modified_random_walk: print("modified_random_walk = 1", file=file) if self.nphot_therm is not None: print(f"nphot_therm = {int(self.nphot_therm)}", file=file) if self.nphot_mono is not None: print(f"nphot_mono = {int(self.nphot_mono)}", file=file) if self.nphot_spec is not None: print(f"nphot_spec = {int(self.nphot_spec)}", file=file) def wavelength_micron(self, out_file: PathLike = None) -> None: """Creates a `wavelength_micron.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'wavelength_micron.inp') with open(out_file, 'w') as file: print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) def external_source(self, out_file: PathLike = None) -> None: """Creates an `external_source.inp` file""" if self.external_source_type is None or self.external_source_type == "None": return elif self.external_source_type == "Draine1978": isrf = diskchef.maps.radiation_fields.draine1978 elif self.external_source_type == "WeingartnerDraine2001": isrf = diskchef.maps.radiation_fields.weingartner_draine_2001 else: raise CHEFNotImplementedError("Unsupported ISRF") if out_file is None: out_file = os.path.join(self.folder, 'external_source.inp') with open(out_file, 'w') as file: print("2", file=file) print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) print( '\n'.join( f"{entry:.7e}" for entry in self.external_source_multiplier * isrf(self.wavelengths).to( u.erg / u.cm ** 2 / u.s / u.Hz / u.sr).value), file=file) def amr_grid(self, out_file: PathLike = None) -> None: """Creates a `amr_grid.inp` file""" if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError if out_file is None: out_file = os.path.join(self.folder, 'amr_grid.inp') with open(out_file, 'w') as file: print('1', file=file) # Typically 1 at present print('0', file=file) # Grid style (regular = 0) print('100', file=file) # Spherical coordinate system print('1' if self.verbosity else '0', file=file) # Grid info print('1 1 0', file=file) # Included coordinates print(len(self.radii_edges) - 1, len(self.theta_edges) - 1, 1, file=file) print(' '.join(f"{entry:.7e}" for entry in self.radii_edges), file=file) print(' '.join(f"{entry:.7e}" for entry in self.theta_edges), file=file) print(0, 2 * np.pi, file=file) @u.quantity_input def run( self, inclination: u.deg = 0 * u.deg, position_angle: u.deg = 0 * u.deg, distance: u.pc = 140 * u.pc, velocity_offset: u.km / u.s = 0 * u.km / u.s, threads: int = 1, npix: int = 100, ) -> None: """Run RadMC3D after files were created with `create_files()`""" raise CHEFNotImplementedError
RadMCBase(chemistry: Union[diskchef.chemistry.base.ChemistryBase, diskchef.physics.base.PhysicsBase] = None, line_list: List[diskchef.lamda.line.Line] = None, executable: Union[str, os.PathLike] = 'radmc3d', folder: Union[str, os.PathLike] = 'radmc', radii_bins: Optional[int] = None, theta_bins: Optional[int] = None, outer_radius: Optional[astropy.units.quantity.Quantity] = None, verbosity: int = 0, wavelengths: astropy.units.quantity.Quantity =
, modified_random_walk: bool = True, scattering_mode_max: int = None, nphot_therm: int = None, nphot_mono: int = None, nphot_spec: int = None, coordinate: Union[str, astropy.coordinates.sky_coordinate.SkyCoord] = None, external_source_type: Literal['Draine1978', 'WeingartnerDraine2001', 'None'] = None, external_source_multiplier: float = 1) Ancestors
Subclasses
Instance variables
var coordinate : str | astropy.coordinates.sky_coordinate.SkyCoord
var executable : str | os.PathLike
var external_source_multiplier : float
var external_source_type : Literal['Draine1978', 'WeingartnerDraine2001', 'None']
var folder : str | os.PathLike
prop mode : str
-
Expand source code
@property def mode(self) -> str: """Mode of RadMC: `mctherm`, `image`, `spectrum`, `sed`""" raise NotImplementedError
Mode of RadMC:
mctherm
,image
,spectrum
,sed
var modified_random_walk : bool
var nphot_mono : int
var nphot_spec : int
var nphot_therm : int
var outer_radius : astropy.units.quantity.Quantity | None
var radii_bins : int | None
var scattering_mode_max : int
var theta_bins : int | None
var verbosity : int
var wavelengths : astropy.units.quantity.Quantity
Methods
def amr_grid(self, out_file: str | os.PathLike = None) ‑> None
-
Expand source code
def amr_grid(self, out_file: PathLike = None) -> None: """Creates a `amr_grid.inp` file""" if not self.table.is_in_zr_regular_grid: raise CHEFNotImplementedError if out_file is None: out_file = os.path.join(self.folder, 'amr_grid.inp') with open(out_file, 'w') as file: print('1', file=file) # Typically 1 at present print('0', file=file) # Grid style (regular = 0) print('100', file=file) # Spherical coordinate system print('1' if self.verbosity else '0', file=file) # Grid info print('1 1 0', file=file) # Included coordinates print(len(self.radii_edges) - 1, len(self.theta_edges) - 1, 1, file=file) print(' '.join(f"{entry:.7e}" for entry in self.radii_edges), file=file) print(' '.join(f"{entry:.7e}" for entry in self.theta_edges), file=file) print(0, 2 * np.pi, file=file)
Creates a
amr_grid.inp
file def catch_radmc_messages(self, proc: subprocess.CompletedProcess) ‑> None
-
Expand source code
def catch_radmc_messages(self, proc: subprocess.CompletedProcess) -> None: """Raises RadMC warnings and errors in `self.logger`""" if proc.stderr: self.logger.error(proc.stderr) self.logger.debug(proc.stdout) for match in re.finditer(r"WARNING:(.*\n(?: .*\n){2,})", proc.stdout): if "In the molecular data file" in match.group(1): self.logger.info(match.group(1)) else: self.logger.warn(match.group(1)) for match in re.finditer(r"ERROR:(.*\n(?: .*\n){2,})", proc.stdout): self.logger.error(match.group(1))
Raises RadMC warnings and errors in
self.logger
def create_files(self) ‑> None
-
Expand source code
def create_files(self) -> None: """Creates all the files necessary to run RadMC3D""" self.radmc3d() self.wavelength_micron() self.amr_grid() self.logger.info("Files written to %s", self.folder)
Creates all the files necessary to run RadMC3D
def external_source(self, out_file: str | os.PathLike = None) ‑> None
-
Expand source code
def external_source(self, out_file: PathLike = None) -> None: """Creates an `external_source.inp` file""" if self.external_source_type is None or self.external_source_type == "None": return elif self.external_source_type == "Draine1978": isrf = diskchef.maps.radiation_fields.draine1978 elif self.external_source_type == "WeingartnerDraine2001": isrf = diskchef.maps.radiation_fields.weingartner_draine_2001 else: raise CHEFNotImplementedError("Unsupported ISRF") if out_file is None: out_file = os.path.join(self.folder, 'external_source.inp') with open(out_file, 'w') as file: print("2", file=file) print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file) print( '\n'.join( f"{entry:.7e}" for entry in self.external_source_multiplier * isrf(self.wavelengths).to( u.erg / u.cm ** 2 / u.s / u.Hz / u.sr).value), file=file)
Creates an
external_source.inp
file def interpolate(self, column: str) ‑> None
-
Expand source code
def interpolate(self, column: str) -> None: """Adds a new `column` to `self.polar_table` with the data iterpolated from `self.table`""" self.polar_table[column] = self.table.interpolate(column)(self.polar_table.r, self.polar_table.z)
Adds a new
column
toself.polar_table
with the data iterpolated fromself.table
def interpolate_back(self, column: str) ‑> None
-
Expand source code
def interpolate_back(self, column: str) -> None: """Adds a new `column` to `self.table` with the data iterpolated from `self.polar_table`""" self.table[column] = self.polar_table.interpolate(column)(self.table.r, self.table.z)
Adds a new
column
toself.table
with the data iterpolated fromself.polar_table
def radmc3d(self, out_file: str | os.PathLike = None) ‑> None
-
Expand source code
def radmc3d(self, out_file: PathLike = None) -> None: """Creates an empty `radmc3d.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'radmc3d.inp') with open(out_file, 'w') as file: if self.scattering_mode_max is not None: print(f"scattering_mode_max = {self.scattering_mode_max}", file=file) if self.modified_random_walk: print("modified_random_walk = 1", file=file) if self.nphot_therm is not None: print(f"nphot_therm = {int(self.nphot_therm)}", file=file) if self.nphot_mono is not None: print(f"nphot_mono = {int(self.nphot_mono)}", file=file) if self.nphot_spec is not None: print(f"nphot_spec = {int(self.nphot_spec)}", file=file)
Creates an empty
radmc3d.inp
file def run(self,
inclination: Unit("deg") = <Quantity 0. deg>,
position_angle: Unit("deg") = <Quantity 0. deg>,
distance: Unit("pc") = <Quantity 140. pc>,
velocity_offset: Unit("km / s") = <Quantity 0. km / s>,
threads: int = 1,
npix: int = 100) ‑> None-
Expand source code
@u.quantity_input def run( self, inclination: u.deg = 0 * u.deg, position_angle: u.deg = 0 * u.deg, distance: u.pc = 140 * u.pc, velocity_offset: u.km / u.s = 0 * u.km / u.s, threads: int = 1, npix: int = 100, ) -> None: """Run RadMC3D after files were created with `create_files()`""" raise CHEFNotImplementedError
Run RadMC3D after files were created with
create_files()
def wavelength_micron(self, out_file: str | os.PathLike = None) ‑> None
-
Expand source code
def wavelength_micron(self, out_file: PathLike = None) -> None: """Creates a `wavelength_micron.inp` file""" if out_file is None: out_file = os.path.join(self.folder, 'wavelength_micron.inp') with open(out_file, 'w') as file: print(len(self.wavelengths), file=file) print('\n'.join(f"{entry.to(u.um).value:.7e}" for entry in self.wavelengths), file=file)
Creates a
wavelength_micron.inp
file
class RadMCOutput (line: Line,
file_radmc: str | os.PathLike = None,
file_fits: str | os.PathLike = None,
object_name: str = '',
distance: Unit("pc") = None,
cloud: Cloud = None,
mode: Literal['image', 'image tracetau'] = 'image')-
Expand source code
@dataclass class RadMCOutput: """Class to store information and visualize the RadMCRT module output""" line: Line file_radmc: PathLike = None file_fits: PathLike = None object_name: str = "" distance: u.pc = None cloud: Cloud = None mode: Literal["image", "image tracetau"] = "image" def __post_init__(self): self.logger = logging.getLogger(__name__ + '.' + self.__class__.__qualname__) self.logger.info("Creating an instance of %s", self.__class__.__qualname__) self.logger.debug("With parameters: %s", self.__dict__) @staticmethod def gauss(x, H, A, x0, sigma): """Gaussian function with a background""" return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2)) @classmethod def gauss_fit(cls, x, y): """Fit a gaussian into data""" mean = sum(x * y) / sum(y) sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y)) popt, pcov = curve_fit(cls.gauss, x, y, p0=[min(y), max(y), mean, sigma]) return popt @classmethod def sigma_from_data(cls, data: np.ndarray) -> float: """Find rms of the data by fitting a gaussian into the histogram (assuming most of the pixels are noise)""" values, edges = np.histogram(data, bins=100, density=True) means = 0.5 * (edges[1:] + edges[:-1]) params = cls.gauss_fit(means, values) # plt.step(means, values, where="mid") # print(params) # plt.plot(means, cls.gauss(means, *params)) return params[3] def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor="black", **kwargs): """Finalize the channel or moment map figure.""" dist_pc = self.distance.to_value(u.pc) pxsize = np.sqrt(np.abs(np.linalg.det(cube.wcs.celestial.pixel_scale_matrix))) try: axes.coords[0].set_major_formatter('hh:mm:ss') axes.coords[1].set_major_formatter('dd:mm:ss') axes.coords[0].set_ticks(spacing=5 * u.arcsec) # , format="hh:mm:ss") axes.coords[1].set_ticks(spacing=5 * u.arcsec) # , format="dd:mm:ss") axes.coords[0].set_ticklabel(exclude_overlapping=True) axes.coords[1].set_ticklabel(exclude_overlapping=True) except AttributeError: pass axes.scatter( 0, 0, color='white', marker='x', zorder=100 ) if ctr_data is not None: noise = self.sigma_from_data(ctr_data) noise_value = getattr(noise, "value", noise) ctr_3sigma = axes.contour( ctr_data, levels=[3 * noise_value], colors='black', linewidths=0.5, ) else: ctr_3sigma = None try: ell = matplotlib.patches.Ellipse( (-0.8 * window, -0.8 * window), cube.beam.major.to_value(u.arcsec) * dist_pc, cube.beam.minor.to_value(u.arcsec) * dist_pc, cube.beam.pa.value + 90, hatch='///', color=ellcolor ) axes.add_patch(ell) except spectral_cube.utils.NoBeamError as e: pass return {"ctr_3sigma": ctr_3sigma} def _check_distance(self): if self.distance is None: try: self.logger.warning( "Distance is not set. Querying from Simbad, but it is not free. " "Provide `distance` argument to RadMCOutput instead") from astroquery.simbad import Simbad simquery = Simbad() simquery.add_votable_fields('distance', 'velocity') self.distance = simquery.query_object(self.object_name)[0]["Distance_distance"] * u.pc except ImportError: self.logger.warning( "Distance to the object was not set. astroquery is not installed. Cannot query Simbad." ) except Exception as e: self.logger.warning("Something went wrong querying Simbad: %s", e) @cached_property def unit_for_channel_map(self) -> u.Unit: if self.mode == "image": return u.K elif self.mode == "image tracetau": return u.dimensionless_unscaled elif self.mode == "tausurf": return u.au else: self.logger.error("Unknown mode %s") return u.dimensionless_unscaled def plot_channel_map( self, window=500 * u.au, cmap: Union[matplotlib.colors.Colormap, str] = None, velocity_offset: u.km / u.s = 0 * u.km / u.s, nx=4, ny=4, ) -> Figure: symnorm = False if cmap is None: if self.mode == "tausurf": cmap = "coolwarm" symnorm = True else: cmap = "Blues" cube = SpectralCube.read(self.file_fits) center = cube.wcs.celestial.pixel_to_world(cube.shape[1] / 2, cube.shape[2] / 2) self._check_distance() radius = ( window / self.distance ).to( u.arcsec, equivalencies=u.dimensionless_angles() ) region = CircleSkyRegion(center, radius) cube.allow_huge_operations = True cube: SpectralCube = ( cube.subcube_from_regions([region]) .with_spectral_unit(u.km / u.s, velocity_convention="radio") ) try: cube = cube.to(self.unit_for_channel_map) except ValueError as e: if cube._beam is None: cube = cube.with_beam(Beam(1e-10 * u.arcsec)).to(self.unit_for_channel_map) else: raise ValueError(e) central_channel = cube.closest_spectral_channel(velocity_offset) # downsampled: SpectralCube = cube.downsample_axis(4, axis=0).with_spectral_unit(u.km / u.s) downsampled = cube[central_channel - nx * ny // 2: central_channel + nx * ny // 2 + 1] maxval = round(0.9 * downsampled.max().value, 1) if symnorm: norm = Normalize(-maxval, maxval) else: norm = Normalize(0, maxval) aspect_ratio = downsampled.shape[2] / float(downsampled.shape[1]) fig_smallest_dim_inches = 5 gridratio = ny / float(nx) * aspect_ratio if gridratio > 1: ysize = fig_smallest_dim_inches * gridratio xsize = fig_smallest_dim_inches else: xsize = fig_smallest_dim_inches * gridratio ysize = fig_smallest_dim_inches fig: Figure = plt.figure(figsize=(xsize, ysize)) gs = GridSpec(nrows=ny, ncols=nx, hspace=0.0, wspace=0.0, figure=fig) ax = None window, window_unit = window.value, window.unit window_half = round(window * 0.6, ndigits=-2) for i in range(ny): for j in range(nx): ax = fig.add_subplot( gs[i, j], sharex=ax, sharey=ax ) im = ax.imshow( downsampled[i * nx + j].value, cmap=cmap, norm=norm, origin="lower", extent=[-window, window, -window, window], ) self.finalize_plot(ax, downsampled, window) this_spectral_coordinate = downsampled.spectral_axis[i * nx + j] ax.text( 0.5, 0.9, f"{this_spectral_coordinate.value: .1f} {this_spectral_coordinate.unit.to_string('latex_inline')}", horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize="small" ) if self.cloud is not None: velocity = downsampled.spectral_axis[i * nx + j] if self.cloud.velocity_min < velocity < self.cloud.velocity_max: ax.scatter(0.5, 0.5, marker='x', color='red', alpha=0.2, transform=ax.transAxes, s=2000) if i != ny - 1: plt.setp(ax.get_xticklabels(), visible=False) else: ax.set_xticks([-window_half, 0, window_half]) if j != 0: plt.setp(ax.get_yticklabels(), visible=False) else: ax.set_yticks([-window_half, 0, window_half]) if i == ny - 1 and j == 0: ax.set_xlabel(f"[{window_unit.to_string('latex_inline')}]") ax.set_ylabel(f"[{window_unit.to_string('latex_inline')}]", labelpad=-18) else: cax = fig.add_axes([0.90, 0.1, 0.03, 0.8]) cb = fig.colorbar(im, cax=cax) cb.ax.set_xlabel(f"\n[{downsampled.unit.to_string('latex')}]") gs.update(left=0.1, right=0.9, top=0.9, bottom=0.1) fig.suptitle(f"{self.object_name} {chemical_names.from_string(self.line.molecule)}") return fig
Class to store information and visualize the RadMCRT module output
Static methods
def gauss(x, H, A, x0, sigma)
-
Expand source code
@staticmethod def gauss(x, H, A, x0, sigma): """Gaussian function with a background""" return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
Gaussian function with a background
def gauss_fit(x, y)
-
Fit a gaussian into data
def sigma_from_data(data: numpy.ndarray) ‑> float
-
Find rms of the data by fitting a gaussian into the histogram (assuming most of the pixels are noise)
Instance variables
var cloud : Cloud
var distance : Unit("pc")
var file_fits : str | os.PathLike
var file_radmc : str | os.PathLike
var line : Line
var mode : Literal['image', 'image tracetau']
var object_name : str
var unit_for_channel_map : astropy.units.core.Unit
-
Expand source code
@cached_property def unit_for_channel_map(self) -> u.Unit: if self.mode == "image": return u.K elif self.mode == "image tracetau": return u.dimensionless_unscaled elif self.mode == "tausurf": return u.au else: self.logger.error("Unknown mode %s") return u.dimensionless_unscaled
Methods
def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor='black', **kwargs)
-
Expand source code
def finalize_plot(self, axes, cube, window, ctr_data=None, ellcolor="black", **kwargs): """Finalize the channel or moment map figure.""" dist_pc = self.distance.to_value(u.pc) pxsize = np.sqrt(np.abs(np.linalg.det(cube.wcs.celestial.pixel_scale_matrix))) try: axes.coords[0].set_major_formatter('hh:mm:ss') axes.coords[1].set_major_formatter('dd:mm:ss') axes.coords[0].set_ticks(spacing=5 * u.arcsec) # , format="hh:mm:ss") axes.coords[1].set_ticks(spacing=5 * u.arcsec) # , format="dd:mm:ss") axes.coords[0].set_ticklabel(exclude_overlapping=True) axes.coords[1].set_ticklabel(exclude_overlapping=True) except AttributeError: pass axes.scatter( 0, 0, color='white', marker='x', zorder=100 ) if ctr_data is not None: noise = self.sigma_from_data(ctr_data) noise_value = getattr(noise, "value", noise) ctr_3sigma = axes.contour( ctr_data, levels=[3 * noise_value], colors='black', linewidths=0.5, ) else: ctr_3sigma = None try: ell = matplotlib.patches.Ellipse( (-0.8 * window, -0.8 * window), cube.beam.major.to_value(u.arcsec) * dist_pc, cube.beam.minor.to_value(u.arcsec) * dist_pc, cube.beam.pa.value + 90, hatch='///', color=ellcolor ) axes.add_patch(ell) except spectral_cube.utils.NoBeamError as e: pass return {"ctr_3sigma": ctr_3sigma}
Finalize the channel or moment map figure.
def plot_channel_map(self,
window=<Quantity 500. AU>,
cmap: matplotlib.colors.Colormap | str = None,
velocity_offset: Unit("km / s") = <Quantity 0. km / s>,
nx=4,
ny=4) ‑> matplotlib.figure.Figure-
Expand source code
def plot_channel_map( self, window=500 * u.au, cmap: Union[matplotlib.colors.Colormap, str] = None, velocity_offset: u.km / u.s = 0 * u.km / u.s, nx=4, ny=4, ) -> Figure: symnorm = False if cmap is None: if self.mode == "tausurf": cmap = "coolwarm" symnorm = True else: cmap = "Blues" cube = SpectralCube.read(self.file_fits) center = cube.wcs.celestial.pixel_to_world(cube.shape[1] / 2, cube.shape[2] / 2) self._check_distance() radius = ( window / self.distance ).to( u.arcsec, equivalencies=u.dimensionless_angles() ) region = CircleSkyRegion(center, radius) cube.allow_huge_operations = True cube: SpectralCube = ( cube.subcube_from_regions([region]) .with_spectral_unit(u.km / u.s, velocity_convention="radio") ) try: cube = cube.to(self.unit_for_channel_map) except ValueError as e: if cube._beam is None: cube = cube.with_beam(Beam(1e-10 * u.arcsec)).to(self.unit_for_channel_map) else: raise ValueError(e) central_channel = cube.closest_spectral_channel(velocity_offset) # downsampled: SpectralCube = cube.downsample_axis(4, axis=0).with_spectral_unit(u.km / u.s) downsampled = cube[central_channel - nx * ny // 2: central_channel + nx * ny // 2 + 1] maxval = round(0.9 * downsampled.max().value, 1) if symnorm: norm = Normalize(-maxval, maxval) else: norm = Normalize(0, maxval) aspect_ratio = downsampled.shape[2] / float(downsampled.shape[1]) fig_smallest_dim_inches = 5 gridratio = ny / float(nx) * aspect_ratio if gridratio > 1: ysize = fig_smallest_dim_inches * gridratio xsize = fig_smallest_dim_inches else: xsize = fig_smallest_dim_inches * gridratio ysize = fig_smallest_dim_inches fig: Figure = plt.figure(figsize=(xsize, ysize)) gs = GridSpec(nrows=ny, ncols=nx, hspace=0.0, wspace=0.0, figure=fig) ax = None window, window_unit = window.value, window.unit window_half = round(window * 0.6, ndigits=-2) for i in range(ny): for j in range(nx): ax = fig.add_subplot( gs[i, j], sharex=ax, sharey=ax ) im = ax.imshow( downsampled[i * nx + j].value, cmap=cmap, norm=norm, origin="lower", extent=[-window, window, -window, window], ) self.finalize_plot(ax, downsampled, window) this_spectral_coordinate = downsampled.spectral_axis[i * nx + j] ax.text( 0.5, 0.9, f"{this_spectral_coordinate.value: .1f} {this_spectral_coordinate.unit.to_string('latex_inline')}", horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize="small" ) if self.cloud is not None: velocity = downsampled.spectral_axis[i * nx + j] if self.cloud.velocity_min < velocity < self.cloud.velocity_max: ax.scatter(0.5, 0.5, marker='x', color='red', alpha=0.2, transform=ax.transAxes, s=2000) if i != ny - 1: plt.setp(ax.get_xticklabels(), visible=False) else: ax.set_xticks([-window_half, 0, window_half]) if j != 0: plt.setp(ax.get_yticklabels(), visible=False) else: ax.set_yticks([-window_half, 0, window_half]) if i == ny - 1 and j == 0: ax.set_xlabel(f"[{window_unit.to_string('latex_inline')}]") ax.set_ylabel(f"[{window_unit.to_string('latex_inline')}]", labelpad=-18) else: cax = fig.add_axes([0.90, 0.1, 0.03, 0.8]) cb = fig.colorbar(im, cax=cax) cb.ax.set_xlabel(f"\n[{downsampled.unit.to_string('latex')}]") gs.update(left=0.1, right=0.9, top=0.9, bottom=0.1) fig.suptitle(f"{self.object_name} {chemical_names.from_string(self.line.molecule)}") return fig
class RadMCVisualize (folder: str | os.PathLike = '.',
line: Line = None)-
Expand source code
@dataclass class RadMCVisualize: folder: PathLike = '.' line: Line = None def channel_map(self) -> None: logging.warning("Deprecated") for file in sorted(glob.glob(os.path.join(self.folder, "*image.out"))): im = radmc3dPy.image.readImage(fname=file) normalizer = Normalize(vmin=im.image.min(), vmax=im.image.max()) nrow = 5 ncol = 8 fig, axes = plt.subplots( nrow, ncol, sharey='row', sharex='col', gridspec_kw=dict(wspace=0.0, hspace=0.0, top=1. - 0.5 / (nrow + 1), bottom=0.5 / (nrow + 1), left=0.5 / (ncol + 1), right=1 - 0.5 / (ncol + 1)), figsize=(ncol + 1, nrow + 1) ) for i, ax in enumerate(axes.flatten()): ax.imshow(im.image[:, :, i], norm=normalizer) axes_f = axes.flatten() cbaxes = fig.add_axes( [axes_f[-1].figbox.x1, axes_f[-1].figbox.y0, 0.1 * (axes_f[-1].figbox.x1 - axes_f[-1].figbox.x0), axes_f[-1].figbox.y1 - axes_f[-1].figbox.y0] ) plt.subplots_adjust(hspace=0.01, wspace=0.01) cbim = axes_f[-1].images[0] clbr = fig.colorbar(cbim, cax=cbaxes) fig.suptitle(file) fig.savefig(file + ".png")
RadMCVisualize(folder: Union[str, os.PathLike] = '.', line: diskchef.lamda.line.Line = None)
Instance variables
var folder : str | os.PathLike
var line : Line
Methods
def channel_map(self) ‑> None
-
Expand source code
def channel_map(self) -> None: logging.warning("Deprecated") for file in sorted(glob.glob(os.path.join(self.folder, "*image.out"))): im = radmc3dPy.image.readImage(fname=file) normalizer = Normalize(vmin=im.image.min(), vmax=im.image.max()) nrow = 5 ncol = 8 fig, axes = plt.subplots( nrow, ncol, sharey='row', sharex='col', gridspec_kw=dict(wspace=0.0, hspace=0.0, top=1. - 0.5 / (nrow + 1), bottom=0.5 / (nrow + 1), left=0.5 / (ncol + 1), right=1 - 0.5 / (ncol + 1)), figsize=(ncol + 1, nrow + 1) ) for i, ax in enumerate(axes.flatten()): ax.imshow(im.image[:, :, i], norm=normalizer) axes_f = axes.flatten() cbaxes = fig.add_axes( [axes_f[-1].figbox.x1, axes_f[-1].figbox.y0, 0.1 * (axes_f[-1].figbox.x1 - axes_f[-1].figbox.x0), axes_f[-1].figbox.y1 - axes_f[-1].figbox.y0] ) plt.subplots_adjust(hspace=0.01, wspace=0.01) cbim = axes_f[-1].images[0] clbr = fig.colorbar(cbim, cax=cbaxes) fig.suptitle(file) fig.savefig(file + ".png")