Source code for rascal.util

import numpy as np
from scipy.optimize import curve_fit
from numpy import exp
import pkg_resources


[docs]def get_vapour_pressure(temperature): """ Appendix A.I of https://emtoolbox.nist.gov/Wavelength/Documentation.asp """ K1 = 1.16705214528E+03 K2 = -7.24213167032E+05 K3 = -1.70738469401E+01 K4 = 1.20208247025E+04 K5 = -3.23255503223E+06 K6 = 1.49151086135E+01 K7 = -4.82326573616E+03 K8 = 4.05113405421E+05 K9 = -2.38555575678E-01 K10 = 6.50175348448E+02 omega = temperature + K9 / (temperature - K10) A = omega**2. + K1 * omega + K2 B = K3 * omega**2. + K4 * omega + K5 C = K6 * omega**2. + K7 * omega + K8 X = -B + np.sqrt(B**2. - 4 * A * C) vapour_pressure = 10**6. * (2. * C / X)**4. return vapour_pressure
[docs]def get_vapour_partial_pressure(relative_humidity, vapour_pressure): """ Appendix A.II of https://emtoolbox.nist.gov/Wavelength/Documentation.asp """ partial_pressure = relative_humidity / 100. * vapour_pressure return partial_pressure
[docs]def edlen_refraction(wavelengths, temperature, pressure, vapour_partial_pressure): """ Appendix A.IV of https://emtoolbox.nist.gov/Wavelength/Documentation.asp """ A = 8342.54 B = 2406147. C = 15998. D = 96095.43 E = 0.601 F = 0.00972 G = 0.003661 S = 1. / np.array(wavelengths)**2. n_s = 1. + 1E-8 * (A + B / (130 - S) + C / (38.9 - S)) X = (1. + 1E-8 * (E - F * (temperature - 273.15)) * pressure) / (1. + G * (temperature - 273.15)) n_tp = 1. + pressure * (n_s - 1.) * X / D n = n_tp - 1E-10 * (292.75 / temperature) * ( 3.7345 - 0.0401 * S) * vapour_partial_pressure return n
[docs]def vacuum_to_air_wavelength(wavelengths, temperature=273.15, pressure=101325, relative_humidity=0): """ The conversion follows the Modified Edlén Equations https://emtoolbox.nist.gov/Wavelength/Documentation.asp pressure drops by ~10% per 1000m above sea level temperature depends heavily on the location relative humidity is between 0-100, depends heavily on the location Parameters ---------- wavelengths: float or numpy.array Wavelengths in vacuum temperature: float In unit of Kelvin pressure: float In unit of Pa relative_humidity: float Unitless in percentage (i.e. 0 - 100) Returns ------- air wavelengths: float or numpy.array The wavelengths in air given the condition """ vapour_pressure = get_vapour_pressure(temperature) vapour_partial_pressure = get_vapour_partial_pressure( relative_humidity, vapour_pressure) return np.array(wavelengths) / edlen_refraction( wavelengths, temperature, pressure, vapour_partial_pressure)
[docs]def filter_wavelengths(lines, min_atlas_wavelength, max_atlas_wavelength): """ Filters a wavelength list to a minimum and maximum range. Parameters ---------- lines: list List of input wavelengths min_atlas_wavelength: int Min wavelength, Ansgtrom max_atlas_wavelength: int Max wavelength, Angstrom Returns ------- lines: list Filtered wavelengths within specified range limit """ wavelengths = lines[:, 1].astype(np.float32) wavelength_mask = ( (wavelengths >= min_atlas_wavelength) & (wavelengths <= max_atlas_wavelength)) return lines[wavelength_mask]
[docs]def filter_separation(wavelengths, min_separation=0): """ Filters a wavelength list by a separation threshold. Parameters ---------- wavelengths: list List of input wavelengths min_separation: int Separation threshold, Ansgtrom Returns ------- distance_mask: list Mask of values which satisfy the separation criteria """ left_dists = np.zeros_like(wavelengths) left_dists[1:] = wavelengths[1:] - wavelengths[:-1] right_dists = np.zeros_like(wavelengths) right_dists[:-1] = wavelengths[1:] - wavelengths[:-1] distances = np.minimum(right_dists, left_dists) distances[0] = right_dists[0] distances[-1] = left_dists[-1] distance_mask = np.abs(distances) >= min_separation return distance_mask
[docs]def filter_intensity(lines, min_intensity=0): """ Filters a line list by an intensity threshold Parameters ---------- lines: list[tuple (str, float, float)] A list of input lines where the 2nd parameter is intensity min_intensity: int Intensity threshold Returns ------- lines: list Filtered line list """ out = [] for line in lines: _, _, intensity = line if float(intensity) >= min_intensity: out.append(True) else: out.append(False) return np.array(out).astype(bool)
[docs]def load_calibration_lines(elements=[], min_atlas_wavelength=3000, max_atlas_wavelength=15000, min_intensity=10, min_distance=10, vacuum=False, pressure=101325., temperature=273.15, relative_humidity=0.): """ Load calibration lines from the standard NIST atlas. Rascal provides a cleaned set of NIST lines that can be used for general purpose calibration. It is recommended however that for repeated and robust calibration, the user should specify an instrument-specific atlas. Provide a wavelength range suitable to your calibration source. You can also specify a minimum intensity that corresponds to the values listed in the NIST tables. If you want air wavelengths (default), you can provide atmospheric conditions for your system. In most cases the default values of standard temperature and pressure should be sufficient. Parameters ---------- elements: list List of short element names, e.g. He as per NIST min_atlas_wavelength: int Minimum wavelength to search, Angstrom max_atlas_wavelength: int Maximum wavelength to search, Angstrom min_intensity: int Minimum intensity to search, per NIST max_intensity: int Maximum intensity to search, per NIST vacuum: bool Return vacuum wavelengths pressure: float Atmospheric pressure, Pascal temperature: float Temperature in Kelvin, default room temp relative_humidity: float Relative humidity, percent Returns ------- out: list Emission lines corresponding to the parameters specified """ if isinstance(elements, str): elements = [elements] # Element, wavelength, intensity file_path = pkg_resources.resource_filename('rascal', 'arc_lines/nist_clean.csv') lines = np.loadtxt(file_path, delimiter=',', dtype=">U12") # Mask elements mask = [(li[0] in elements) for li in lines] lines = lines[mask] # Filter wavelengths lines = filter_wavelengths(lines, min_atlas_wavelength, max_atlas_wavelength) # Filter intensities if min_intensity > 0: intensity_mask = filter_intensity(lines, min_intensity) else: intensity_mask = np.ones_like(lines[:, 0]).astype(bool) elements = lines[:, 0][intensity_mask] wavelengths = lines[:, 1][intensity_mask].astype('float32') intensities = lines[:, 2][intensity_mask].astype('float32') # Calculate peak separation if min_distance > 0: distance_mask = filter_separation(wavelengths, min_distance) else: distance_mask = np.ones_like(wavelengths.astype(bool)) elements = elements[distance_mask] wavelengths = wavelengths[distance_mask] intensities = intensities[distance_mask] # Vacuum to air conversion if not vacuum: wavelengths = vacuum_to_air_wavelength(wavelengths, temperature, pressure, relative_humidity) return elements, wavelengths, intensities
[docs]def gauss(x, a, x0, sigma): """ 1D Gaussian Parameters ---------- x: value or values to evaluate the Gaussian at a: float Magnitude x0: float Gaussian centre sigma: float Standard deviation (spread) Returns ------- out: list The Gaussian function evaluated at provided x """ return a * exp(-(x - x0)**2 / (2 * sigma**2 + 1e-9))
[docs]def refine_peaks(spectrum, peaks, window_width=10, distance=None): """ Refine peak locations in a spectrum from a set of initial estimates. This function attempts to fit a Gaussian to each peak in the provided list. It returns a list of sub-pixel refined peaks. If two peaks are very close, they can be refined to the same location. In this case only one of the peaks will be returned - i.e. this function will return a unique set of peak locations. Parameters ---------- spectrum: list Input spectrum (list of intensities) peaks: list A list of peak locations in pixels window_width: int Size of window to consider in fit either side of initial peak location Returns ------- refined_peaks: list A list of refined peak locations """ refined_peaks = [] spectrum = np.array(spectrum) length = len(spectrum) for peak in peaks: y = spectrum[max(0, int(peak) - window_width):min(int(peak) + window_width, length)] y /= np.nanmax(y) x = np.arange(len(y)) mask = np.isfinite(y) & ~np.isnan(y) n = np.sum(mask) if n == 0: continue mean = np.sum(x * y) / n sigma = np.sum(y * (x - mean)**2) / n try: popt, _ = curve_fit(gauss, x[mask], y[mask], p0=[1, mean, sigma]) height, centre, _ = popt if height < 0: continue if centre > len(spectrum) or centre < 0: continue refined_peaks.append(peak - window_width + centre) except RuntimeError: continue refined_peaks = np.array(refined_peaks) mask = (refined_peaks > 0) & (refined_peaks < len(spectrum)) # Remove peaks that are within rounding errors from each other from the # curve_fit distance_mask = np.isclose(refined_peaks[:-1], refined_peaks[1:]) distance_mask = np.insert(distance_mask, 0, False) return refined_peaks[mask & ~distance_mask]
def _derivative(p): """ Compute the derivative of a polynomial function. Parameters ---------- p: list Polynomial coefficients, in increasing order (e.g. 0th coefficient first) Returns ------- derv: list Derivative coefficients, i * p[i] """ derv = [] for i in range(1, len(p)): derv.append(i * p[i]) return derv