diff --git a/stingray/simulator/__init__.py b/stingray/simulator/__init__.py index 5b5c5a490..1d6a32dae 100644 --- a/stingray/simulator/__init__.py +++ b/stingray/simulator/__init__.py @@ -9,4 +9,4 @@ # For egg_info test builds to pass, put package imports here. if not _ASTROPY_SETUP_: - from stingray.simulator.simulator import Simulator + from stingray.simulator.simulator import Simulator, CrossSpectrumSimulator diff --git a/stingray/simulator/simulator.py b/stingray/simulator/simulator.py index 61645a265..2b9825ce7 100644 --- a/stingray/simulator/simulator.py +++ b/stingray/simulator/simulator.py @@ -8,8 +8,10 @@ from stingray import utils from stingray import Lightcurve from stingray import AveragedPowerspectrum +from stingray import AveragedCrossspectrum +from typing import Iterable, Tuple, Optional, Union, Callable, Literal -__all__ = ["Simulator"] +__all__ = ["Simulator", "CrossSpectrumSimulator"] class Simulator(object): @@ -632,3 +634,920 @@ def write(self, filename, fmt="pickle"): pickle.dump(self, fobj) else: raise KeyError("Format not understood.") + + def get_refftfreq(self) -> np.ndarray: + """ + Calculate the positive frequencies for the real-valued FFT of a signal. + + This method computes the frequencies corresponding to the positive + half of the discrete Fourier Transform (DFT) of a signal, based on + the parameters of the red noise and the number of samples. + + Returns: + np.ndarray: An array of positive frequency values corresponding + to the real FFT of the signal. + """ + + return np.fft.rfftfreq(self.red_noise * self.N, d=self.dt)[1:] + + +class CrossSpectrumSimulator(Simulator): + r"""Simulate pairs of correlated light curves with arbitrary coherence and + phase lag distributions. + + Extends :class:`Simulator` with the correlated Timmer-Koenig method from + Larner, Nowak, & Wilms (2026), which generates two light curves whose + cross-spectral properties (coherence :math:`\gamma^2` and phase lag + :math:`\phi` or cospectrum :math:`\mathrm{Re}[C]` and quadrature spectrum + :math:`\mathrm{Im}[C]`) match user-supplied models at every Fourier frequency. + + Parameters + ---------- + dt : float + Time resolution (sampling interval) of the simulated light curves in + seconds. + N : int + Number of time bins in each simulated light curve. + mean : float + Mean count rate of the simulated light curves in counts/s. + rms : float or tuple of float + Fractional root-mean-square variability of the output light curves. + Must satisfy ``rms <= 1``. If a tuple ``(rms1, rms2)`` is given, + independent fractional RMS values are applied to the reference and + dependent light curves, respectively. + err : float, optional + Constant error bar to attach to every time bin. Default is 0. + red_noise : int, optional + Oversampling factor used to mitigate red-noise leakage: the internal + time series is generated at ``red_noise * N`` bins and a random + segment of length ``N`` is extracted. Must be 1 for + ``CrossSpectrumSimulator`` (values > 1 raise + :exc:`NotImplementedError`). Default is 1. + random_state : int or numpy.random.RandomState, optional + Seed or random-state object for reproducible simulations. + Default is ``None`` (unseeded). + tstart : float, optional + Start time of the output light curves in seconds. Default is 0. + poisson : bool, optional + If ``True``, draw final counts from a Poisson distribution so that + the light curves contain non-negative integer counts. Default is + ``False``. + + Attributes + ---------- + dt : float + Time resolution of the simulated light curves in seconds. + N : int + Number of time bins in each simulated light curve. + mean : float + Mean count rate in counts/s. + rms : float or tuple of (float, float) + Fractional RMS variability. Stored as a tuple when a per-band pair + was supplied at construction time. + red_noise : int + Oversampling factor (always 1 for this class). + tstart : float + Start time of the output light curves in seconds. + time : numpy.ndarray + Array of time bin centres, ``dt * arange(N) + tstart``. + err : float + Constant error bar on each time bin. + poisson : bool + Whether Poisson noise is applied to the output counts. + channels : list of tuple + Energy-channel store inherited from :class:`Simulator`. Each entry is + a ``(channel_name, Lightcurve)`` pair. Note that ``simulate_channel`` + is disabled on this class; use ``CS_simulate`` directly. + random_state : numpy.random.RandomState + Random-state object used for all stochastic draws. + + Methods + ------- + CS_simulate(pds1, pds2[, params, lag, coh, cospec, quadspec, ...]) + Simulate a correlated pair of light curves from power-spectral and + cross-spectral model inputs. + crossspectrum(lc1, lc2[, seg_size, norm]) + Compute the averaged cross spectrum of two light curves (static + method; can also be called on an instance). + get_refftfreq() + Return the positive FFT frequencies at which PSD and lag models are + evaluated, of length ``N // 2``. + simulate(\*args) + Inherited single-band Timmer-Koenig simulator; the + ``_extract_and_scale`` override ensures compatibility with the tuple + ``rms`` attribute. + simulate_channel(channel, \*args) + Disabled. Raises :exc:`NotImplementedError`; use ``CS_simulate`` + to generate correlated light curve pairs. + + Raises + ------ + NotImplementedError + If ``red_noise > 1`` is requested, or if ``simulate_channel`` is + called (use ``CS_simulate`` instead). + AssertionError + If any value in a tuple ``rms`` exceeds 1. + + See Also + -------- + Simulator : Parent class providing single-band simulation and + channel-management utilities. + + References + ---------- + Larner, S. R., Nowak, M. A., & Wilms, J. 2026 (under review) + Timmer, J., & Koenig, M. 1995, A&A, 300, 707 + + Examples + -------- + Simulate two light curves with 80 % coherence and a 0.5 rad phase lag: + + >>> import numpy as np + >>> from stingray.simulator import CrossSpectrumSimulator + >>> sim = CrossSpectrumSimulator(N=1024, mean=100.0, dt=0.1, rms=0.1, + ... random_state=42) + >>> lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + >>> lc1.n == 1024 + True + """ + + def __init__(self, *args, **kwargs): + # rms may be a tuple (rms1, rms2); parent only accepts a scalar. + # Extract it, validate, then pass the max to satisfy the parent assertion. + rms_tuple = None + if "rms" in kwargs and isinstance(kwargs["rms"], tuple): + rms_tuple = kwargs["rms"] + rms1, rms2 = rms_tuple + assert rms1 <= 1 and rms2 <= 1, "Fractional rms must be less than 1." + kwargs = {**kwargs, "rms": max(rms1, rms2)} + elif len(args) > 3 and isinstance(args[3], tuple): + rms_tuple = args[3] + rms1, rms2 = rms_tuple + assert rms1 <= 1 and rms2 <= 1, "Fractional rms must be less than 1." + args = args[:3] + (max(rms1, rms2),) + args[4:] + + super().__init__(*args, **kwargs) + + if rms_tuple is not None: + self.rms = rms_tuple # Restore original tuple after parent sets scalar + + if self.red_noise != 1: + raise NotImplementedError("Red noise > 1 not implemented for cross spectral fitting") + + def simulate_channel(self, channel, *args): + raise NotImplementedError( + "simulate_channel is not supported by CrossSpectrumSimulator because it produces " + "uncorrelated single-band light curves. Use CS_simulate() to generate a correlated " + "pair of light curves instead." + ) + + def CS_simulate( + self, + pds1: Union[str, float, astropy.modeling.Model, Callable[[Iterable], Iterable]], + pds2: Optional[Union[str, float, astropy.modeling.Model, Callable[[Iterable], Iterable]]], + params: Optional[Union[list, dict]] = None, + lag: Optional[ + Union[str, float, astropy.modeling.Model, Callable[[Iterable], Iterable], Iterable] + ] = None, + coh: Optional[ + Union[str, float, astropy.modeling.Model, Callable[[Iterable], Iterable], Iterable] + ] = None, + cospec: Optional[Union[str, float, Callable[[Iterable], Iterable], Iterable]] = None, + quadspec: Optional[Union[str, float, Callable[[Iterable], Iterable], Iterable]] = None, + lag_params: Optional[Union[list, dict]] = None, + coh_params: Optional[Union[list, dict]] = None, + cospec_params: Optional[Union[list, dict]] = None, + quadspec_params: Optional[Union[list, dict]] = None, + ) -> Tuple[Lightcurve, Lightcurve]: + r"""Simulate two LightCurves from a power spectrum with a specified + phase lag and/or coherence distribution. + + Parameters + ---------- + pds1 : str or float or astropy.modeling.Model or callable or array-like + Shape of the power spectrum for the reference time series. + If str, a model name from ``stingray.simulator.models``. + If float, the index of a power-law power spectrum. + If ``astropy.modeling.Model``, the model is evaluated at each frequency. + If other callable, must have signature ``f(frequency) -> pds``. + If array-like, must give the PSD at each frequency in ``self.get_refftfreq()``. + pds2 : str or float or astropy.modeling.Model or callable or array-like, optional + Shape of the power spectrum for the dependent time series. + Accepts the same types as ``pds1``. If not given, defaults to ``pds1``. + params : list or dict, optional + Parameters for the predefined model when ``pds1`` or ``pds2`` is a string. + lag : str or float or astropy.modeling.Model or callable or array-like, optional + Phase lag spectrum in radians. If omitted, no phase lag is applied. + If float, a constant value in :math:`[-\pi, \pi]`. + If str, a model name from ``stingray.simulator.models``; supply + parameters via ``lag_params``. + If callable, must have signature ``f(frequency) -> lag``. + If array-like, must give the lag at each frequency in ``self.get_refftfreq()``. + coh : str or float or astropy.modeling.Model or callable or array-like, optional + Coherence spectrum. If omitted, no coherence modification is applied. + If float, a constant value in [0, 1]. + If str, a model name from ``stingray.simulator.models``; supply + parameters via ``coh_params``. + If callable, must have signature ``f(frequency) -> coherence``. + If array-like, must give the coherence at each frequency in ``self.get_refftfreq()``. + cospec : str or float or callable or array-like, optional + Real part of the cross spectrum (co-spectrum). Cannot be specified + together with ``coh`` or ``lag``. + If float, constant across all frequencies. + If str, a model name from ``stingray.simulator.models``; supply + parameters via ``cospec_params``. + If callable, must have signature ``f(frequency) -> cospec``. + If array-like, must give the value at each frequency in ``self.get_refftfreq()``. + quadspec : str or float or callable or array-like, optional + Imaginary part of the cross spectrum (quadrature spectrum). Cannot be + specified together with ``coh`` or ``lag``. + If float, constant across all frequencies. + If str, a model name from ``stingray.simulator.models``; supply + parameters via ``quadspec_params``. + If callable, must have signature ``f(frequency) -> quadspec``. + If array-like, must give the value at each frequency in ``self.get_refftfreq()``. + lag_params : list or dict, optional + Parameters for the predefined model when ``lag`` is a string. + coh_params : list or dict, optional + Parameters for the predefined model when ``coh`` is a string. + cospec_params : list or dict, optional + Parameters for the predefined model when ``cospec`` is a string. + quadspec_params : list or dict, optional + Parameters for the predefined model when ``quadspec`` is a string. + + Returns + ------- + lc1, lc2 : tuple of `~stingray.Lightcurve` + Reference and dependent light curves, respectively. + + Raises + ------ + ValueError + If both ``cospec``/``quadspec`` and ``coh``/``lag`` are specified, + or if a model string cannot be parsed. + """ + + if pds2 is None: + pds2 = pds1 + + use_coh = coh is not None + use_lag = lag is not None + use_cross_spectra = cospec is not None or quadspec is not None + + if use_cross_spectra and (use_coh or use_lag): + raise ValueError( + "Cannot specify both cospec/quadspec and coh/lag. " "Use one pair or the other." + ) + + w = self.get_refftfreq() + + # Inspect the input and generate... + # pds_distribution 1 + if isinstance(pds1, (float, int)): + pds_shape1 = self._make_powerlaw_pds(pds1, self.dt, self.red_noise * self.N) + + elif isinstance(pds1, str): + from stingray.simulator import models + + if not hasattr(models, pds1): + raise ValueError("Model string not defined") + + if isinstance(params, dict): + model = getattr(models, pds1)(**params) + pds_shape1 = model(w) + elif isinstance(params, list): + model_func = getattr(models, pds1) + pds_shape1 = model_func(w, params) + else: + raise ValueError("Params should be list or dictionary!") + + elif callable(pds1): + pds_shape1 = pds1(w) + else: + pds_shape1 = np.asarray(pds1) + + # pds distribution 2 + if isinstance(pds2, (float, int)): + pds_shape2 = self._make_powerlaw_pds(pds2, self.dt, self.red_noise * self.N) + + elif isinstance(pds2, str): + from stingray.simulator import models + + if not hasattr(models, pds2): + raise ValueError("Model string not defined") + + if isinstance(params, dict): + model = getattr(models, pds2)(**params) + pds_shape2 = model(w) + elif isinstance(params, list): + model_func = getattr(models, pds2) + pds_shape2 = model_func(w, params) + else: + raise ValueError("Params should be list or dictionary!") + + elif callable(pds2): + pds_shape2 = pds2(w) + else: + pds_shape2 = np.asarray(pds2) + + # Parse cospec distribution + cospec_shape = None + if cospec is not None: + if isinstance(cospec, (float, int)): + cospec_shape = np.ones_like(w) * cospec + elif isinstance(cospec, str): + from stingray.simulator import models + + if not hasattr(models, cospec): + raise ValueError("Model string not defined") + if isinstance(cospec_params, dict): + model = getattr(models, cospec)(**cospec_params) + cospec_shape = model(w) + elif isinstance(cospec_params, list): + model_func = getattr(models, cospec) + cospec_shape = model_func(w, cospec_params) + else: + raise ValueError("Params should be list or dictionary!") + elif isinstance(cospec, Iterable): + cospec_shape = cospec + else: + cospec_shape = cospec(w) + + # Parse quadspec distribution + quadspec_shape = None + if quadspec is not None: + if isinstance(quadspec, (float, int)): + quadspec_shape = np.ones_like(w) * quadspec + elif isinstance(quadspec, str): + from stingray.simulator import models + + if not hasattr(models, quadspec): + raise ValueError("Model string not defined") + if isinstance(quadspec_params, dict): + model = getattr(models, quadspec)(**quadspec_params) + quadspec_shape = model(w) + elif isinstance(quadspec_params, list): + model_func = getattr(models, quadspec) + quadspec_shape = model_func(w, quadspec_params) + else: + raise ValueError("Params should be list or dictionary!") + elif isinstance(quadspec, Iterable): + quadspec_shape = quadspec + else: + quadspec_shape = quadspec(w) + + # If neither lag nor coh nor cross spectra specified, simulate two independent light curves + if not use_lag and not use_coh and not use_cross_spectra: + if params is not None: + return (self.simulate(pds1, params), self.simulate(pds2, params)) + else: + return (self.simulate(pds1), self.simulate(pds2)) + + # Parse lag distribution + lag_shape = None + if use_lag: + if isinstance(lag, (float, int)): + lag_shape = np.ones_like(w) * lag + elif isinstance(lag, str): + from stingray.simulator import models + + if not hasattr(models, lag): + raise ValueError("Model string not defined") + if isinstance(lag_params, dict): + model = getattr(models, lag)(**lag_params) + lag_shape = model(w) + elif isinstance(lag_params, list): + model_func = getattr(models, lag) + lag_shape = model_func(w, lag_params) + else: + raise ValueError("Params should be list or dictionary!") + elif isinstance(lag, Iterable): + lag_shape = lag + else: + lag_shape = lag(w) + + # Parse coh distribution + coh_shape = None + if use_coh: + if isinstance(coh, (float, int)): + coh_shape = np.ones_like(w) * coh + elif isinstance(coh, str): + from stingray.simulator import models + + if not hasattr(models, coh): + raise ValueError("Model string not defined") + if isinstance(coh_params, dict): + model = getattr(models, coh)(**coh_params) + coh_shape = model(w) + elif isinstance(coh_params, list): + model_func = getattr(models, coh) + coh_shape = model_func(w, coh_params) + else: + raise ValueError("Params should be list or dictionary!") + elif isinstance(coh, Iterable): + coh_shape = coh + else: + coh_shape = coh(w) + + c1, c2 = self._correlated_timmerkoenig( + P1=pds_shape1, + P2=pds_shape2, + output_length=self.N * self.dt, + dt=self.dt, + mean=self.mean, + lag=lag_shape, + gamma=coh_shape, + red_noise=self.red_noise, + rms=self.rms, + poisson=self.poisson, + cospec=cospec_shape, + quadspec=quadspec_shape, + ) + + t = np.arange(len(c1)) * self.dt + + lc1 = Lightcurve(time=t, counts=c1, err=np.zeros_like(c1), dt=self.dt, skip_checks=True) + lc2 = Lightcurve(time=t, counts=c2, err=np.zeros_like(c2), dt=self.dt, skip_checks=True) + + return (lc1, lc2) + + @staticmethod + def crossspectrum( + lc1: Lightcurve, + lc2: Lightcurve, + seg_size: Optional[float] = None, + norm: Optional[Literal["frac", "abs", "leahy", "none"]] = "frac", + ) -> np.ndarray: + """ + Make a cross spectrum of the simulated light curves. + + Parameters + ---------- + lc1 : `~stingray.Lightcurve` or iterable of `~stingray.Lightcurve` + The reference light curve data to be Fourier-transformed. + lc2 : `~stingray.Lightcurve` or iterable of `~stingray.Lightcurve` + The dependent light curve data to be Fourier-transformed. + seg_size : float, optional + Segment size in seconds. Defaults to the full light curve length. + norm : str, optional + Normalization of the cross spectrum. One of ``'frac'``, ``'abs'``, + ``'leahy'``, or ``'none'``. Default is ``'frac'``. + + Returns + ------- + power : numpy.ndarray + Array of complex cross-spectral powers. + + Notes + ----- + ``lc1`` and ``lc2`` must have the same length. + """ + + # Following stingray convention by including this method + + if seg_size is None: + seg_size = lc1.tseg + + return AveragedCrossspectrum(lc1, lc2, seg_size, silent=True, norm=norm).power + + def _compute_transfer_function( + self, + gamma2: Union[float, np.ndarray[float]], + phi: Union[float, np.ndarray[float]], + P_X: Union[float, np.ndarray[float]], + P_Y: Union[float, np.ndarray[float]], + ) -> Union[complex, np.ndarray[complex]]: + r"""Compute the transfer function T from coherence and phase lag. + + Implements Equation 15 from Larner, Nowak, & Wilms (2026): + + .. math:: + + T = \sqrt{\frac{P_Y \gamma^2}{P_X}} \, e^{i\phi} + + Parameters + ---------- + gamma2 : float or numpy.ndarray + Coherence squared, :math:`\gamma^2`. Range: [0, 1]. + phi : float or numpy.ndarray + Phase lag in radians. Range: :math:`[-\pi, \pi]`. + P_X : float or numpy.ndarray + Power spectrum of the reference time series. + P_Y : float or numpy.ndarray + Power spectrum of the dependent time series. + + Returns + ------- + T : complex or numpy.ndarray + Complex transfer function relating the two time series in Fourier space. + + Notes + ----- + The transfer function encodes both the coherence (in its magnitude) and + the phase lag (in its argument). This formulation ensures proper + normalization of the power spectra. + + References + ---------- + Larner, S. R., Nowak, M. A., & Wilms, J. 2026 (under review) + """ + + magnitude = np.sqrt(P_Y * gamma2 / P_X) + return magnitude * np.exp(1j * phi) + + def _compute_normalization_constant( + self, + P_X: Union[float, np.ndarray[float]], + P_Y: Union[float, np.ndarray[float]], + T: Union[complex, np.ndarray[complex]], + ) -> Union[float, np.ndarray[float]]: + r"""Compute the normalization constant K for the incoherent component. + + Implements Equation 12 from Larner, Nowak, & Wilms (2026): + + .. math:: + + K = \sqrt{\frac{P_Y - P_X |T|^2}{2}} + + Parameters + ---------- + P_X : float or numpy.ndarray + Power spectrum of the reference time series. + P_Y : float or numpy.ndarray + Power spectrum of the dependent time series. + T : complex or numpy.ndarray + Transfer function (from ``_compute_transfer_function``). + + Returns + ------- + K : float or numpy.ndarray + Normalization constant for the incoherent component. + + Notes + ----- + This constant ensures that the dependent time series Y has the correct + power spectrum :math:`P_Y`. The factor of 2 accounts for the variance of + complex Gaussian random variables. + + References + ---------- + Larner, S. R., Nowak, M. A., & Wilms, J. 2026 (under review) + """ + + T_mag_squared = np.abs(T) ** 2 + + return np.sqrt(np.maximum(P_Y - P_X * T_mag_squared, 0.0) / 2) + + def _cross_spectra_to_coh_lag( + self, + cospec: Union[float, np.ndarray], + quadspec: Union[float, np.ndarray], + P_X: Union[float, np.ndarray], + P_Y: Union[float, np.ndarray], + ) -> Tuple[Union[float, np.ndarray], Union[float, np.ndarray]]: + r"""Convert co-spectrum and quadrature spectrum to coherence and phase lag. + + Parameters + ---------- + cospec : float or numpy.ndarray + Real part of the cross spectrum, :math:`\mathrm{Re}[C]`. + quadspec : float or numpy.ndarray + Imaginary part of the cross spectrum, :math:`\mathrm{Im}[C]`. + P_X : float or numpy.ndarray + Power spectrum of the reference time series. + P_Y : float or numpy.ndarray + Power spectrum of the dependent time series. + + Returns + ------- + gamma2 : float or numpy.ndarray + Coherence squared, + :math:`\gamma^2 = (\mathrm{Re}[C]^2 + \mathrm{Im}[C]^2) / (P_X P_Y)`. + phi : float or numpy.ndarray + Phase lag in radians, + :math:`\phi = \arctan2(\mathrm{Im}[C],\, \mathrm{Re}[C])`. + """ + + gamma2 = (cospec**2 + quadspec**2) / (P_X * P_Y) + phi = np.arctan2(quadspec, cospec) + return gamma2, phi + + def _invert_fft(self, x: Iterable[complex], mean: float, nbins: int) -> np.ndarray: + """Prepend the DC component and compute the inverse real FFT. + + Parameters + ---------- + x : array-like of complex + One-sided FFT coefficients at positive frequencies (excluding DC). + mean : float + Mean count rate; used to set the DC (zero-frequency) component. + nbins : int + Length of the output time series passed to ``numpy.fft.irfft``. + + Returns + ------- + counts : numpy.ndarray + Real-valued time series of length ``nbins``. + """ + f = np.hstack([mean * nbins, x]) + + return np.fft.irfft(f, n=nbins) + + def _extract_and_scale( + self, + long_lc: np.ndarray, + N: Optional[int] = None, + mean: Optional[float] = None, + red_noise: Optional[int] = None, + random_state=None, + rms: Optional[float] = None, + ): + """ + i) Make a random cut and extract a light curve of required + length. + + ii) Rescale light curve i) with zero mean and unit standard + deviation, and ii) user provided mean and rms (fractional + rms * mean) + + Overrides the parent method to accept explicit parameters + instead of relying solely on instance attributes. This allows + both light curves in a correlated pair to use the same random + extraction cut (via the same ``random_state`` seed) while + supporting per-band rms. + + Falls back to instance attributes when called via the parent's + ``_timmerkoenig`` path (i.e. when extra arguments are omitted). + + Parameters + ---------- + long_lc : numpy.ndarray + Simulated lightcurve of length 'N' times 'red_noise' + N : int, optional + Number of bins in output lightcurve. Defaults to ``self.N``. + mean : float, optional + Mean count rate of the output lightcurve. Defaults to ``self.mean``. + red_noise : int, optional + Red noise oversampling factor. Defaults to ``self.red_noise``. + random_state : int or numpy.random.RandomState, optional + Seed or state for the random extraction cut. Defaults to + ``self.random_state``. + rms : float, optional + Fractional RMS. Defaults to ``self.rms`` (scalar only). + + Returns + ------- + lc : numpy.ndarray + Normalized and extracted lightcurve of length 'N' + """ + if N is None: + N = self.N + if mean is None: + mean = self.mean + if red_noise is None: + red_noise = self.red_noise + if rms is None: + rms = self.rms if not isinstance(self.rms, tuple) else self.rms[0] + + random_state = utils.get_random_state(random_state) + + std = long_lc.std() + + if red_noise == 1: + lc = long_lc + else: + extract = random_state.randint(0, red_noise * N - N + 1) + lc = np.take(long_lc, range(extract, extract + N)) + + mean_lc = np.mean(lc) + + if mean == 0: + return (lc - mean_lc) / std * rms + else: + return (lc - mean_lc) / std * mean * rms + mean + + def _make_powerlaw_pds(self, index: float, dt: float, bins: int) -> np.ndarray: + """Construct a power-law PDS from a given spectral index. + + Parameters + ---------- + index : float + Spectral index of the power law (negative slope in log-log space). + dt : float + Time resolution in seconds. + bins : int + Total number of time bins (used to compute FFT frequencies). + + Returns + ------- + p : numpy.ndarray + Power-law PDS evaluated at the positive FFT frequencies. + """ + + w = np.fft.rfftfreq(bins, d=dt)[1:] + + p = np.power((1 / w), index) + + return p + + def _correlated_timmerkoenig( + self, + P1: np.ndarray, + output_length: float, + dt: float, + mean: Union[float, Tuple[float, float]], + lag: Optional[Union[float, np.ndarray[float]]] = None, + gamma: Optional[Union[float, np.ndarray[float]]] = None, + red_noise: Optional[int] = 1, + rms: Optional[Union[float, Tuple[float, float]]] = 0.1, + P2: Optional[np.ndarray] = None, + poisson: bool = False, + cospec: Optional[Union[float, np.ndarray[float]]] = None, + quadspec: Optional[Union[float, np.ndarray[float]]] = None, + ) -> Tuple[np.ndarray[float], np.ndarray[float]]: + r"""Simulate two correlated time series with arbitrary coherence and phase lag. + + Implements the method from Larner, Nowak, & Wilms (2026) using Equations 9, 10, + 12, and 15. The reference time series is generated using the Timmer-Koenig method, + and the dependent time series is constructed as a weighted sum of a coherent + component (derived from the reference via a transfer function) and an incoherent + component. + + Parameters + ---------- + P1 : numpy.ndarray + Power spectrum of the reference time series (P_X in paper notation). + output_length : float + Length of the output light curve in seconds. + dt : float + Time resolution of the output light curve in seconds. + mean : float + Mean count rate of the output light curves in counts/s. + lag : float or numpy.ndarray, optional + Phase lag spectrum in radians (:math:`\phi`). Range: :math:`[-\pi, \pi]`. + If float, constant across all frequencies. + If array, must have the same length as ``P1``. Default: 0. + gamma : float or numpy.ndarray, optional + Coherence squared spectrum (:math:`\gamma^2`). Range: [0, 1]. + If float, constant across all frequencies. + If array, must have the same length as ``P1``. Default: 1 (perfect coherence). + red_noise : int, optional + Red noise oversampling factor. Default: 1. + rms : float or tuple of float, optional + Fractional RMS variability. If tuple ``(rms1, rms2)``, different values + are used for the reference and dependent series. Default: 0.1. + P2 : numpy.ndarray, optional + Power spectrum of the dependent time series (P_Y). If ``None``, ``P2 = P1``. + Must have the same shape as ``P1`` if provided. + poisson : bool, optional + If ``True``, draw final counts from a Poisson distribution. Default: ``False``. + cospec : float or numpy.ndarray, optional + Real part of the cross spectrum, :math:`\mathrm{Re}[C]`. Cannot be specified + together with ``gamma``/``lag``. If float, constant across all frequencies. + quadspec : float or numpy.ndarray, optional + Imaginary part of the cross spectrum, :math:`\mathrm{Im}[C]`. Cannot be + specified together with ``gamma``/``lag``. If float, constant across all + frequencies. + + Returns + ------- + counts1 : numpy.ndarray + Reference time series. + counts2 : numpy.ndarray + Dependent time series, correlated with ``counts1``. + + Raises + ------ + ValueError + If ``P1`` and ``P2`` have different shapes, or if both + ``cospec``/``quadspec`` and ``gamma``/``lag`` are specified. + + Notes + ----- + The method follows these steps at each Fourier frequency: + + 1. Reference transform [Eq. 9]: + + .. math:: X = \sqrt{P_X / 2} \, (A_r + i B_r) + + 2. Transfer function [Eq. 15]: + + .. math:: T = \sqrt{\frac{P_Y \gamma^2}{P_X}} \, e^{i\phi} + + 3. Normalization constant [Eq. 12]: + + .. math:: K = \sqrt{\frac{P_Y - P_X |T|^2}{2}} + + 4. Dependent transform [Eq. 10]: + + .. math:: Y = K (H_r + i J_r) + T X + + where :math:`A_r, B_r, H_r, J_r` are independent standard normal random + variables. When :math:`\gamma^2 = 1` the incoherent component vanishes and + :math:`Y = T X`. + + References + ---------- + Larner, S. R., Nowak, M. A., & Wilms, J. 2026 (under review) + Timmer, J., & Koenig, M. 1995, A&A, 300, 707 + """ + + if P2 is None: + P2 = P1 + elif P1.shape != P2.shape: + raise ValueError("Both power spectra must have the same shape!") + + # Validate that cospec/quadspec and gamma/lag are not both specified + use_cross_spectra = cospec is not None or quadspec is not None + use_coh_lag = gamma is not None or lag is not None + if use_cross_spectra and use_coh_lag: + raise ValueError( + "Cannot specify both cospec/quadspec and gamma/lag. " "Use one pair or the other." + ) + + # Convert cospec/quadspec to gamma and lag + if use_cross_spectra: + pds_size = P1.size + if cospec is None: + cospec = np.zeros(pds_size) + elif isinstance(cospec, (float, int)): + cospec = np.ones(pds_size) * cospec + if quadspec is None: + quadspec = np.zeros(pds_size) + elif isinstance(quadspec, (float, int)): + quadspec = np.ones(pds_size) * quadspec + gamma, lag = self._cross_spectra_to_coh_lag(cospec, quadspec, P1, P2) + + pds_size = P1.size + + # Default: no phase lag + if lag is None: + lag = np.zeros(pds_size) + elif isinstance(lag, (float, int)): + lag = np.ones(pds_size) * lag + + # Default: perfect coherence + if gamma is None: + gamma = np.ones(pds_size) + elif isinstance(gamma, (float, int)): + gamma = np.ones(pds_size) * gamma + + randint = self.random_state.randint(low=0, high=10000) + + N = int(output_length / dt) + long_N = int(output_length * red_noise / dt) + + # Generate random variables (one set per frequency) + Ar = self.random_state.normal(size=pds_size) + Br = self.random_state.normal(size=pds_size) + Hr = self.random_state.normal(size=pds_size) + Jr = self.random_state.normal(size=pds_size) + + # Equation 9: Reference Fourier transform + # X = sqrt(P_X/2) * (A_r + i*B_r) + X = np.sqrt(P1 / 2) * (Ar + 1j * Br) + + # Equation 15: Transfer function (includes phase lag) + # T = sqrt(P_Y * γ² / P_X) * exp(i*φ) + T = self._compute_transfer_function(gamma2=gamma, phi=lag, P_X=P1, P_Y=P2) + + # Equation 12: Normalization constant for incoherent component + # K = sqrt((P_Y - P_X*|T|²) / 2) + K = self._compute_normalization_constant(P_X=P1, P_Y=P2, T=T) + + # Equation 10: Dependent Fourier transform + # Y = K*(H_r + i*J_r) + T*X + Y = K * (Hr + 1j * Jr) + T * X + + # Handle separate RMS for each band if provided as tuple + if isinstance(rms, tuple): + rms1, rms2 = rms + else: + rms1 = rms2 = rms + + # Inverse FFT to get time series + counts1 = self._invert_fft(X, mean=mean, nbins=long_N) + counts1 = self._extract_and_scale( + long_lc=counts1, + N=N, + red_noise=red_noise, + rms=rms1, + random_state=randint, + mean=mean, + ) + + counts2 = self._invert_fft(Y, mean=mean, nbins=long_N) + counts2 = self._extract_and_scale( + long_lc=counts2, + N=N, + red_noise=red_noise, + rms=rms2, + random_state=randint, + mean=mean, + ) + + if poisson: + counts1 = self.random_state.poisson(counts1) + counts2 = self.random_state.poisson(counts2) + + return (counts1, counts2) diff --git a/stingray/simulator/tests/test_simulator.py b/stingray/simulator/tests/test_simulator.py index 053f37b06..10860452b 100644 --- a/stingray/simulator/tests/test_simulator.py +++ b/stingray/simulator/tests/test_simulator.py @@ -6,7 +6,7 @@ import pytest import astropy.modeling.models from stingray import Lightcurve, Crossspectrum, sampledata, Powerspectrum -from stingray.simulator import Simulator +from stingray.simulator import Simulator, CrossSpectrumSimulator from stingray.simulator import models _H5PY_INSTALLED = True @@ -612,3 +612,693 @@ def test_io_with_unsupported_format(self): with pytest.raises(KeyError): sim.read("sim.pickle", fmt="hdf5") os.remove("sim.pickle") + + +class TestCrossSpectrumSimulator(object): + @classmethod + def setup_class(cls): + cls.N = 2048 + cls.mean = 100.0 + cls.dt = 0.1 + cls.rms = 0.1 + cls.sim = CrossSpectrumSimulator( + N=cls.N, mean=cls.mean, dt=cls.dt, rms=cls.rms, random_state=42 + ) + + # --- Construction and validation --- + + def test_simulate_channel_raises(self): + """simulate_channel is disabled; users must use CS_simulate instead.""" + with pytest.raises(NotImplementedError): + self.sim.simulate_channel("3-10keV", 2.0) + + def test_red_noise_gt1_raises(self): + """CrossSpectrumSimulator does not support red_noise > 1.""" + with pytest.raises(NotImplementedError): + CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, red_noise=2 + ) + + def test_rms_tuple_keyword(self): + """rms tuple passed as keyword argument is stored correctly.""" + sim = CrossSpectrumSimulator(N=self.N, mean=self.mean, dt=self.dt, rms=(0.1, 0.2)) + assert sim.rms == (0.1, 0.2) + + def test_rms_tuple_positional(self): + """rms tuple passed as positional argument is stored correctly.""" + sim = CrossSpectrumSimulator(self.dt, self.N, self.mean, (0.1, 0.2)) + assert sim.rms == (0.1, 0.2) + + def test_rms_tuple_invalid_raises(self): + """rms tuple with any value > 1 is rejected.""" + with pytest.raises(AssertionError): + CrossSpectrumSimulator(N=self.N, mean=self.mean, dt=self.dt, rms=(0.1, 1.5)) + + # --- Return type and shape --- + + def test_cs_simulate_returns_tuple_of_lightcurves(self): + """CS_simulate returns a 2-tuple of Lightcurve objects.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + assert isinstance(lc1, Lightcurve) + assert isinstance(lc2, Lightcurve) + + def test_cs_simulate_output_length(self): + """Both output light curves have length N.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + # --- Error handling --- + + def test_cs_simulate_lag_and_cospec_raises(self): + """Combining lag with cospec raises ValueError.""" + with pytest.raises(ValueError): + self.sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, cospec=0.5) + + def test_cs_simulate_coh_and_quadspec_raises(self): + """Combining coh with quadspec raises ValueError.""" + with pytest.raises(ValueError): + self.sim.CS_simulate(pds1=2.0, pds2=None, coh=0.5, quadspec=0.5) + + # --- Fallback path (no cross-spectral constraints) --- + + def test_cs_simulate_no_cross_terms_returns_two_lightcurves(self): + """Without coh/lag/cospec/quadspec, two independent Lightcurves are returned.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None) + assert isinstance(lc1, Lightcurve) + assert isinstance(lc2, Lightcurve) + assert lc1.n == self.N + assert lc2.n == self.N + + # --- Input variety --- + + def test_cs_simulate_array_lag(self): + """lag can be given as an array of per-frequency values.""" + lag_arr = np.ones_like(self.sim.get_refftfreq()) * 0.5 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=lag_arr) + assert lc1.n == self.N + + def test_cs_simulate_array_coh(self): + """coh can be given as an array of per-frequency values.""" + coh_arr = np.ones_like(self.sim.get_refftfreq()) * 0.8 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, coh=coh_arr) + assert lc1.n == self.N + + def test_cs_simulate_callable_pds(self): + """pds1 can be a callable with signature f(frequency) -> pds.""" + pds_func = lambda f: np.power(1 / f, 2.0) + lc1, lc2 = self.sim.CS_simulate(pds1=pds_func, pds2=None, lag=0.5, coh=1.0) + assert lc1.n == self.N + + def test_cs_simulate_array_pds(self): + """pds1 can be an array of PSD values at each rfftfreq.""" + pds_arr = np.power(1 / self.sim.get_refftfreq(), 2.0) + lc1, lc2 = self.sim.CS_simulate(pds1=pds_arr, pds2=None, lag=0.5, coh=1.0) + assert lc1.n == self.N + + def test_cs_simulate_different_pds(self): + """pds1 and pds2 can be different power spectra.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=1.0, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_quadspec(self): + """Cross spectrum can be specified directly via cospec and quadspec.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, cospec=0.5, quadspec=0.3) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_rms_tuple(self): + """rms tuple applies different fractional RMS to each band.""" + sim = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=(0.1, 0.2), random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + # --- Poisson --- + + def test_cs_simulate_poisson_nonnegative_integer_counts(self): + """With poisson=True, both light curves have non-negative integer counts.""" + sim = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, poisson=True, random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + assert np.all(lc1.counts >= 0) + assert np.all(lc2.counts >= 0) + assert np.all(lc1.counts == lc1.counts.astype(int)) + assert np.all(lc2.counts == lc2.counts.astype(int)) + + # --- Reproducibility --- + + def test_cs_simulate_same_seed_reproducible(self): + """Two simulators with the same random_state produce identical output.""" + sim1 = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=42 + ) + lc1a, lc2a = sim1.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + + sim2 = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=42 + ) + lc1b, lc2b = sim2.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + + np.testing.assert_array_equal(lc1a.counts, lc1b.counts) + np.testing.assert_array_equal(lc2a.counts, lc2b.counts) + + def test_cs_simulate_different_seeds_differ(self): + """Different random seeds produce different light curves.""" + sim1 = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=42 + ) + lc1a, _ = sim1.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + + sim2 = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=99 + ) + lc1b, _ = sim2.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + + assert not np.allclose(lc1a.counts, lc1b.counts) + + # --- crossspectrum static method --- + + def test_crossspectrum_static_method(self): + """crossspectrum() can be called on the class without an instance.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=1.0) + cs = CrossSpectrumSimulator.crossspectrum(lc1, lc2) + assert isinstance(cs, np.ndarray) + assert len(cs) == self.N // 2 - 1 + + def test_crossspectrum_instance_method(self): + """crossspectrum() can also be called on an instance.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=1.0) + cs = self.sim.crossspectrum(lc1, lc2) + assert isinstance(cs, np.ndarray) + assert len(cs) == self.N // 2 - 1 + + # --- Parent simulate still works via _extract_and_scale override --- + + def test_parent_simulate_still_works(self): + """The overridden _extract_and_scale does not break the parent simulate().""" + lc = self.sim.simulate(2.0) + assert isinstance(lc, Lightcurve) + assert lc.n == self.N + + # --- Statistical correctness --- + + def test_cs_simulate_mean_and_rms(self): + """Simulated light curves have approximately the correct mean and fractional RMS.""" + nsim = 100 + sim = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=42 + ) + means1, means2, rms1_list, rms2_list = [], [], [], [] + for _ in range(nsim): + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + means1.append(np.mean(lc1.counts)) + means2.append(np.mean(lc2.counts)) + rms1_list.append(np.std(lc1.counts) / np.mean(lc1.counts)) + rms2_list.append(np.std(lc2.counts) / np.mean(lc2.counts)) + + assert np.isclose(np.mean(means1), self.mean, rtol=0.01) + assert np.isclose(np.mean(rms1_list), self.rms, rtol=0.1) + assert np.isclose(np.mean(means2), self.mean, rtol=0.01) + assert np.isclose(np.mean(rms2_list), self.rms, rtol=0.1) + + def test_cs_simulate_phase_lag_recovery(self): + r"""With perfect coherence (:math:`\gamma^2 = 1`), the cross-spectral phase + equals the input lag exactly at every Fourier frequency for a single realisation. + + Derivation: with :math:`\gamma^2 = 1`, :math:`Y(f) = T(f) X(f)` where + :math:`T(f) = \sqrt{P_2/P_1} \, e^{i\phi}`. + The mean-subtraction and rescaling in ``_extract_and_scale`` divide each series + by a real scalar, so the phase of :math:`\overline{X} Y = |X|^2 T` is + preserved as :math:`\phi`. + """ + target_lag = 0.5 # radians + sim = CrossSpectrumSimulator( + N=self.N, mean=500.0, dt=self.dt, rms=0.3, random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=target_lag, coh=1.0) + + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + # Exclude DC (index 0) and Nyquist (index -1) bins + X = np.fft.rfft(x)[1:-1] + Y = np.fft.rfft(y)[1:-1] + + phases = np.angle(np.conj(X) * Y) + np.testing.assert_allclose(phases, target_lag, atol=1e-6) + + def test_cs_simulate_coherence_recovery(self): + r"""Average coherence over many realisations converges to the target value. + + The coherence estimator + + .. math:: + + \hat{\gamma}^2(f) = \frac{|\langle C(f) \rangle|^2} + {\langle P_1(f) \rangle \langle P_2(f) \rangle} + + (averages over independent realisations) converges to :math:`\gamma^2` as + :math:`N_\mathrm{sim} \to \infty`. + """ + target_coh = 0.6 + N = 1024 + nsim = 300 + sim = CrossSpectrumSimulator(N=N, mean=100.0, dt=self.dt, rms=self.rms, random_state=0) + + nfreq = N // 2 # length of rfft(x)[1:] + sum_Cxy = np.zeros(nfreq, dtype=complex) + sum_Pxx = np.zeros(nfreq) + sum_Pyy = np.zeros(nfreq) + + for _ in range(nsim): + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.0, coh=target_coh) + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + X = np.fft.rfft(x)[1:] + Y = np.fft.rfft(y)[1:] + sum_Cxy += np.conj(X) * Y + sum_Pxx += np.abs(X) ** 2 + sum_Pyy += np.abs(Y) ** 2 + + coh_est = np.abs(sum_Cxy) ** 2 / (sum_Pxx * sum_Pyy) + assert np.abs(np.mean(coh_est) - target_coh) < 0.05 + + def test_cs_simulate_zero_coherence_uncorrelated(self): + r"""With coh=0 the two light curves are statistically uncorrelated. + + The average estimated coherence (computed over many independent realisations) + should converge to zero, because the incoherent component :math:`K(H + iJ)` + carries all of the power and is drawn from a fully independent random draw. + """ + nsim = 200 + N = 1024 + sim = CrossSpectrumSimulator(N=N, mean=100.0, dt=self.dt, rms=0.2, random_state=0) + + nfreq = N // 2 + sum_Cxy = np.zeros(nfreq, dtype=complex) + sum_Pxx = np.zeros(nfreq) + sum_Pyy = np.zeros(nfreq) + + for _ in range(nsim): + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, coh=0.0) + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + X = np.fft.rfft(x)[1:] + Y = np.fft.rfft(y)[1:] + sum_Cxy += np.conj(X) * Y + sum_Pxx += np.abs(X) ** 2 + sum_Pyy += np.abs(Y) ** 2 + + coh_est = np.abs(sum_Cxy) ** 2 / (sum_Pxx * sum_Pyy) + assert np.mean(coh_est) < 0.05 + + def test_cs_simulate_perfect_coherence_same_pds_identical(self): + r"""With coh=1, lag=0, and pds1=pds2 the two light curves are numerically identical. + + Derivation: :math:`T = \sqrt{P_2 \gamma^2 / P_1} \, e^{i\phi} = 1` and + :math:`K = \sqrt{(P_2 - P_1 |T|^2) / 2} = 0`, so :math:`Y = X` exactly in + Fourier space. Both series share the same extraction cut and rescaling + parameters, so counts1 == counts2 to machine precision. + """ + sim = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=self.rms, random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.0, coh=1.0) + np.testing.assert_array_equal(lc1.counts, lc2.counts) + + def test_cs_simulate_zero_lag_zero_phase_different_pds(self): + r"""With coh=1, lag=0, and pds1 != pds2 the cross-spectrum phase is identically zero. + + With :math:`\gamma^2 = 1` and :math:`\phi = 0`: + :math:`T = \sqrt{P_2 / P_1}` (real positive at every bin). + Then :math:`\overline{X} Y = T |X|^2` is real positive, so angle = 0 exactly. + Using pds2 != pds1 ensures the light curves are not identical (non-trivial test). + """ + sim = CrossSpectrumSimulator( + N=self.N, mean=500.0, dt=self.dt, rms=0.3, random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=1.0, lag=0.0, coh=1.0) + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + X = np.fft.rfft(x)[1:-1] + Y = np.fft.rfft(y)[1:-1] + phases = np.angle(np.conj(X) * Y) + np.testing.assert_allclose(phases, 0.0, atol=1e-6) + + def test_cs_simulate_negative_lag_sign_convention(self): + r"""Negative lag produces a negative cross-spectrum phase (sign convention check). + + With :math:`\gamma^2 = 1`: :math:`Y = e^{i\phi} X` so + :math:`\angle(\overline{X} Y) = \phi` exactly. + This test verifies that negative :math:`\phi` is preserved faithfully. + """ + target_lag = -0.7 # radians + sim = CrossSpectrumSimulator( + N=self.N, mean=500.0, dt=self.dt, rms=0.3, random_state=42 + ) + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=target_lag, coh=1.0) + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + X = np.fft.rfft(x)[1:-1] + Y = np.fft.rfft(y)[1:-1] + phases = np.angle(np.conj(X) * Y) + np.testing.assert_allclose(phases, target_lag, atol=1e-6) + + def test_cs_simulate_frequency_dependent_lag_array_recovery(self): + r"""Frequency-dependent lag array is recovered exactly at :math:`\gamma^2 = 1`. + + With a flat PSD (index=0, :math:`T = e^{i\phi(f)}`), + :math:`\overline{X}(f) Y(f) = e^{i\phi(f)} |X(f)|^2` so the phase at each bin + equals :math:`\phi(f)` exactly. The lag array spans ``get_refftfreq()`` which has + N//2 entries; ``rfft[1:-1]`` gives N//2-1 entries (excluding Nyquist), matching + ``lag_arr[:-1]``. + """ + sim = CrossSpectrumSimulator( + N=self.N, mean=500.0, dt=self.dt, rms=0.3, random_state=42 + ) + w = sim.get_refftfreq() # N//2 elements + median_f = np.median(w) + # Step function: 0.3 rad below median frequency, 0.9 rad above + lag_arr = np.where(w < median_f, 0.3, 0.9) + + # Flat PSD so all bins have equal power (avoids near-zero high-freq bins) + lc1, lc2 = sim.CS_simulate(pds1=0.0, pds2=None, lag=lag_arr, coh=1.0) + x = lc1.counts - lc1.counts.mean() + y = lc2.counts - lc2.counts.mean() + # rfft has N//2+1 coefficients; [1:-1] drops DC and Nyquist → N//2-1 entries + X = np.fft.rfft(x)[1:-1] + Y = np.fft.rfft(y)[1:-1] + phases = np.angle(np.conj(X) * Y) + np.testing.assert_allclose(phases, lag_arr[:-1], atol=1e-6) + + def test_cs_simulate_rms_tuple_independent_per_band(self): + """rms tuple produces the correct, independent fractional RMS in each band. + + The mean fractional RMS of lc1 should converge to rms1 and lc2 to rms2, + and the two values should be measurably different. + """ + rms1_target, rms2_target = 0.1, 0.3 + nsim = 100 + sim = CrossSpectrumSimulator( + N=self.N, mean=self.mean, dt=self.dt, rms=(rms1_target, rms2_target), random_state=42 + ) + rms1_list, rms2_list = [], [] + for _ in range(nsim): + lc1, lc2 = sim.CS_simulate(pds1=2.0, pds2=None, lag=0.5, coh=0.8) + rms1_list.append(np.std(lc1.counts) / np.mean(lc1.counts)) + rms2_list.append(np.std(lc2.counts) / np.mean(lc2.counts)) + + assert np.isclose(np.mean(rms1_list), rms1_target, rtol=0.1) + assert np.isclose(np.mean(rms2_list), rms2_target, rtol=0.1) + # Verify the two values are actually different (not just equal to the same scalar) + assert not np.isclose(np.mean(rms1_list), np.mean(rms2_list), rtol=0.05) + + def test_cross_spectra_to_coh_lag_conversion(self): + r"""``_cross_spectra_to_coh_lag`` correctly implements :math:`\gamma^2` and + :math:`\phi` formulae. + + For cospec = quadspec = C and :math:`P_X = P_Y = P`: + + .. math:: + + \gamma^2 = \frac{C^2 + C^2}{P^2} = \frac{2 C^2}{P^2}, \qquad + \phi = \arctan2(C,\, C) = \frac{\pi}{4} + """ + C = np.sqrt(2.0) + P = np.ones(10) * 4.0 + cospec = np.ones(10) * C + quadspec = np.ones(10) * C + gamma2, phi = self.sim._cross_spectra_to_coh_lag(cospec, quadspec, P, P) + + expected_gamma2 = (C**2 + C**2) / (4.0 * 4.0) # 4 / 16 = 0.25 + expected_phi = np.pi / 4 + + np.testing.assert_allclose(gamma2, expected_gamma2) + np.testing.assert_allclose(phi, expected_phi) + + # --- pds1/pds2 model string and astropy model input types --- + + def test_cs_simulate_pds1_astropy_model(self): + """pds1 accepts an astropy Model instance (callable).""" + mod = models.GeneralizedLorentz1D(x_0=10, fwhm=1.0, value=10.0, power_coeff=2) + lc1, lc2 = self.sim.CS_simulate(pds1=mod, pds2=None, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_pds1_string_list_params(self): + """pds1 accepts a model-name string with list params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1="generalized_lorentzian", + pds2=None, + lag=0.5, + coh=0.8, + params=[0.3, 0.9, 0.6, 0.5], + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_pds1_string_dict_params(self): + """pds1 accepts a model-name string with dict params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1="GeneralizedLorentz1D", + pds2=None, + lag=0.5, + coh=0.8, + params={"x_0": 10, "fwhm": 1.0, "value": 10.0, "power_coeff": 2}, + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_pds1_invalid_string_raises(self): + """pds1 as an unrecognised model string raises ValueError.""" + with pytest.raises(ValueError, match="Model string not defined"): + self.sim.CS_simulate(pds1="nonexistent_model", pds2=None, lag=0.5, coh=0.8, params=[1]) + + def test_cs_simulate_pds1_string_params_not_list_or_dict_raises(self): + """pds1 string model with params neither list nor dict raises ValueError.""" + with pytest.raises(ValueError, match="Params should be list or dictionary"): + self.sim.CS_simulate( + pds1="generalized_lorentzian", pds2=None, lag=0.5, coh=0.8, params=12345 + ) + + def test_cs_simulate_pds2_callable(self): + """pds2 accepts a callable with signature f(frequency) -> pds.""" + pds_func = lambda f: np.power(1 / f, 2.0) + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=pds_func, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_pds2_array(self): + """pds2 accepts an array of PSD values at each rfftfreq.""" + pds_arr = np.power(1 / self.sim.get_refftfreq(), 2.0) + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=pds_arr, lag=0.5, coh=0.8) + assert lc1.n == self.N + assert lc2.n == self.N + + # --- lag input types --- + + def test_cs_simulate_lag_callable(self): + """lag accepts a callable with signature f(frequency) -> lag.""" + lag_func = lambda f: np.ones_like(f) * 0.5 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, lag=lag_func) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_lag_string_list_params(self): + """lag accepts a model-name string with list params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + lag="generalized_lorentzian", + lag_params=[3.0, 2.0, 0.5, 2.0], + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_lag_string_dict_params(self): + """lag accepts a model-name string with dict params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + lag="GeneralizedLorentz1D", + lag_params={"x_0": 3.0, "fwhm": 2.0, "value": 0.5, "power_coeff": 2}, + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_lag_invalid_string_raises(self): + """lag as an unrecognised model string raises ValueError.""" + with pytest.raises(ValueError, match="Model string not defined"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, lag="nonexistent_model", lag_params=[1] + ) + + def test_cs_simulate_lag_string_bad_params_raises(self): + """lag string model with params neither list nor dict raises ValueError.""" + with pytest.raises(ValueError, match="Params should be list or dictionary"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, lag="generalized_lorentzian", lag_params=12345 + ) + + # --- coh input types --- + + def test_cs_simulate_coh_callable(self): + """coh accepts a callable with signature f(frequency) -> coherence.""" + coh_func = lambda f: np.ones_like(f) * 0.8 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, coh=coh_func) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_coh_string_list_params(self): + """coh accepts a model-name string with list params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + coh="generalized_lorentzian", + coh_params=[3.0, 2.0, 0.7, 2.0], + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_coh_string_dict_params(self): + """coh accepts a model-name string with dict params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + coh="GeneralizedLorentz1D", + coh_params={"x_0": 3.0, "fwhm": 2.0, "value": 0.7, "power_coeff": 2}, + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_coh_invalid_string_raises(self): + """coh as an unrecognised model string raises ValueError.""" + with pytest.raises(ValueError, match="Model string not defined"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, coh="nonexistent_model", coh_params=[1] + ) + + def test_cs_simulate_coh_string_bad_params_raises(self): + """coh string model with params neither list nor dict raises ValueError.""" + with pytest.raises(ValueError, match="Params should be list or dictionary"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, coh="generalized_lorentzian", coh_params=12345 + ) + + # --- cospec/quadspec input types --- + + def test_cs_simulate_cospec_only_float(self): + """cospec as a float without quadspec produces two valid light curves.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, cospec=0.5) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_quadspec_only_float(self): + """quadspec as a float without cospec produces two valid light curves.""" + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, quadspec=0.3) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_callable(self): + """cospec accepts a callable f(frequency) -> cospec.""" + cospec_func = lambda f: np.ones_like(f) * 0.5 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, cospec=cospec_func) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_array(self): + """cospec accepts an array of per-frequency values.""" + cospec_arr = np.ones_like(self.sim.get_refftfreq()) * 0.5 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, cospec=cospec_arr) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_string_list_params(self): + """cospec accepts a model-name string with list params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + cospec="generalized_lorentzian", + cospec_params=[3.0, 2.0, 0.5, 2.0], + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_string_dict_params(self): + """cospec accepts a model-name string with dict params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + cospec="GeneralizedLorentz1D", + cospec_params={"x_0": 3.0, "fwhm": 2.0, "value": 0.5, "power_coeff": 2}, + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_cospec_invalid_string_raises(self): + """cospec as an unrecognised model string raises ValueError.""" + with pytest.raises(ValueError, match="Model string not defined"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, cospec="nonexistent_model", cospec_params=[1] + ) + + def test_cs_simulate_cospec_string_bad_params_raises(self): + """cospec string model with params neither list nor dict raises ValueError.""" + with pytest.raises(ValueError, match="Params should be list or dictionary"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, cospec="generalized_lorentzian", cospec_params=12345 + ) + + def test_cs_simulate_quadspec_callable(self): + """quadspec accepts a callable f(frequency) -> quadspec.""" + quadspec_func = lambda f: np.ones_like(f) * 0.3 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, quadspec=quadspec_func) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_quadspec_array(self): + """quadspec accepts an array of per-frequency values.""" + quadspec_arr = np.ones_like(self.sim.get_refftfreq()) * 0.3 + lc1, lc2 = self.sim.CS_simulate(pds1=2.0, pds2=None, quadspec=quadspec_arr) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_quadspec_string_list_params(self): + """quadspec accepts a model-name string with list params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + quadspec="generalized_lorentzian", + quadspec_params=[3.0, 2.0, 0.3, 2.0], + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_quadspec_string_dict_params(self): + """quadspec accepts a model-name string with dict params.""" + lc1, lc2 = self.sim.CS_simulate( + pds1=2.0, + pds2=None, + quadspec="GeneralizedLorentz1D", + quadspec_params={"x_0": 3.0, "fwhm": 2.0, "value": 0.3, "power_coeff": 2}, + ) + assert lc1.n == self.N + assert lc2.n == self.N + + def test_cs_simulate_quadspec_invalid_string_raises(self): + """quadspec as an unrecognised model string raises ValueError.""" + with pytest.raises(ValueError, match="Model string not defined"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, quadspec="nonexistent_model", quadspec_params=[1] + ) + + def test_cs_simulate_quadspec_string_bad_params_raises(self): + """quadspec string model with params neither list nor dict raises ValueError.""" + with pytest.raises(ValueError, match="Params should be list or dictionary"): + self.sim.CS_simulate( + pds1=2.0, pds2=None, quadspec="generalized_lorentzian", quadspec_params=12345 + )