Skip to content
Open
222 changes: 222 additions & 0 deletions stumpy/sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from . import config

try: # pragma: no cover
import pyfftw

PYFFTW_IS_AVAILABLE = True
except ImportError: # pragma: no cover
PYFFTW_IS_AVAILABLE = False


@njit(fastmath=config.STUMPY_FASTMATH_TRUE)
def _njit_sliding_dot_product(Q, T):
Expand Down Expand Up @@ -109,6 +116,221 @@ def _pocketfft_sliding_dot_product(Q, T):
return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n]


class _PYFFTW_SLIDING_DOT_PRODUCT:
"""
A class to compute the sliding dot product using FFTW via pyfftw.

This class uses FFTW (via pyfftw) to efficiently compute the sliding dot product
between a query sequence, Q, and a time series, T. It preallocates arrays and caches
FFTW objects to optimize repeated computations with similar-sized inputs.

Parameters
----------
max_n : int, default=2**20
Maximum length to preallocate arrays for. This will be the size of the
real-valued array. A complex-valued array of size `1 + (max_n // 2)`
will also be preallocated. If inputs exceed this size, arrays will be
reallocated to accommodate larger sizes.

Attributes
----------
real_arr : pyfftw.empty_aligned
Preallocated real-valued array for FFTW computations.

complex_arr : pyfftw.empty_aligned
Preallocated complex-valued array for FFTW computations.

rfft_objects : dict
Cache of FFTW forward transform objects with
(next_fast_n, n_threads, planning_flag) as lookup keys.

irfft_objects : dict
Cache of FFTW inverse transform objects with
(next_fast_n, n_threads, planning_flag) as lookup keys.

Methods
-------
__call__(Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE")
Compute the sliding dot product between `Q` and `T` using FFTW
via pyfftw, and cache FFTW objects if not already cached.

Notes
-----
The class maintains internal caches of FFTW objects to avoid redundant planning
operations when called multiple times with similar-sized inputs and parameters.
Comment thread
NimaSarajpoor marked this conversation as resolved.
When `planning_flag=="FFTW_ESTIMATE"`, there will be no planning operation.
However, caching FFTW objects is still beneficial as the overhead of creating
those objects can be avoided in subsequent calls.

Examples
--------
>>> sdp_obj = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=1000)
>>> Q = np.array([1, 2, 3])
>>> T = np.array([4, 5, 6, 7, 8])
>>> result = sdp_obj(Q, T)

References
----------
`FFTW documentation <http://www.fftw.org/>`__

`pyfftw documentation <https://pyfftw.readthedocs.io/>`__
"""

def __init__(self, max_n=2**20, real_dtype="float64"):
"""
Initialize the `_PYFFTW_SLIDING_DOT_PRODUCT` object, which can be called
to compute the sliding dot product using FFTW via pyfftw.

Parameters
----------
max_n : int, default=2**20
Maximum length to preallocate arrays for. This will be the size of the
real-valued array. A complex-valued array of size `1 + (max_n // 2)`
will also be preallocated.

real_dtype : str, default="float64"
The real data type to use for the preallocated arrays. Must be either
"float64" or "longdouble". The complex data type will be set to
"complex128" or "clongdouble" respectively.
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated

Returns
-------
None
"""
REAL_TO_COMPLEX_MAP = {
"float64": "complex128",
"longdouble": "clongdouble",
}
if real_dtype not in ["float64", "longdouble"]: # pragma: no cover
raise ValueError(
f"Invalid real_dtype: {real_dtype}. Must be 'float64' or 'longdouble'."
)
complex_dtype = REAL_TO_COMPLEX_MAP[real_dtype]

# Preallocate arrays
self.real_arr = pyfftw.empty_aligned(max_n, dtype=real_dtype)
self.complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype=complex_dtype)

# Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag)
self.rfft_objects = {}
self.irfft_objects = {}

def __call__(self, Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"):
"""
Compute the sliding dot product between `Q` and `T` using FFTW via pyfftw,
and cache FFTW objects if not already cached.

Parameters
----------
Q : numpy.ndarray
Query array or subsequence.

T : numpy.ndarray
Time series or sequence.

n_threads : int, default=1
Number of threads to use for FFTW computations.

planning_flag : str, default="FFTW_ESTIMATE"
The planning flag that will be used in FFTW for planning.
See pyfftw documentation for details. Current options, ordered
ascendingly by the level of aggressiveness in planning, are:
"FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE".
The more aggressive the planning, the longer the planning time, but
the faster the execution time.

Returns
-------
out : numpy.ndarray
Sliding dot product between `Q` and `T`.

Notes
-----
The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with
MATLAB's FFTW usage (as of version R2025b)
See: https://www.mathworks.com/help/matlab/ref/fftw.html

This implementation is inspired by the answer on StackOverflow:
https://stackoverflow.com/a/30615425/2955541
"""
m = Q.shape[0]
n = T.shape[0]
next_fast_n = pyfftw.next_fast_len(n)

# Update preallocated arrays if needed
if next_fast_n > len(self.real_arr):
self.real_arr = pyfftw.empty_aligned(next_fast_n, dtype=self.real_arr.dtype)
self.complex_arr = pyfftw.empty_aligned(
1 + (next_fast_n // 2), dtype=self.complex_arr.dtype
)

real_arr = self.real_arr[:next_fast_n]
complex_arr = self.complex_arr[: 1 + (next_fast_n // 2)]

# Get or create FFTW objects
key = (next_fast_n, n_threads, planning_flag)

rfft_obj = self.rfft_objects.get(key, None)
Comment thread
NimaSarajpoor marked this conversation as resolved.
irfft_obj = self.irfft_objects.get(key, None)

if rfft_obj is None or irfft_obj is None:
rfft_obj = pyfftw.FFTW(
input_array=real_arr,
output_array=complex_arr,
direction="FFTW_FORWARD",
flags=(planning_flag,),
threads=n_threads,
)
irfft_obj = pyfftw.FFTW(
input_array=complex_arr,
output_array=real_arr,
direction="FFTW_BACKWARD",
flags=(planning_flag, "FFTW_DESTROY_INPUT"),
threads=n_threads,
)
self.rfft_objects[key] = rfft_obj
self.irfft_objects[key] = irfft_obj
else:
# Update the input and output arrays of the cached FFTW objects
# in case their original input and output arrays were reallocated
# in a previous call
rfft_obj.update_arrays(real_arr, complex_arr)
irfft_obj.update_arrays(complex_arr, real_arr)

# Compute the (circular) convolution between T and Q[::-1],
# each zero-padded to length `next_fast_n` by performing
# the following three steps:

# Step 1
# Compute RFFT of T (zero-padded)
# Must make a copy of output to avoid losing it when the array is
# overwritten when computing the RFFT of Q in the next step
rfft_obj.input_array[:n] = T
rfft_obj.input_array[n:] = 0.0
rfft_obj.execute()
rfft_T = rfft_obj.output_array.copy()

# Step 2
# Compute RFFT of Q (reversed, scaled, and zero-padded)
# Scaling is required because the thin wrapper `execute`
# that will be called below does not perform normalization
np.multiply(Q[::-1], 1.0 / next_fast_n, out=rfft_obj.input_array[:m])
rfft_obj.input_array[m:] = 0.0
rfft_obj.execute()
rfft_Q = rfft_obj.output_array

# Step 3
# Convert back to time domain by taking the inverse RFFT
np.multiply(rfft_T, rfft_Q, out=irfft_obj.input_array)
irfft_obj.execute()

return irfft_obj.output_array[m - 1 : n] # valid portion


if PYFFTW_IS_AVAILABLE: # pragma: no cover
_pyfftw_sliding_dot_product = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**20)


def _sliding_dot_product(Q, T):
"""
Compute the sliding dot product between `Q` and `T`
Expand Down
49 changes: 40 additions & 9 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,57 @@ gen_ray_coveragerc()
# Generate a .coveragerc_override file that excludes Ray functions and tests
gen_coveragerc_boilerplate
echo " def .*_ray_*" >> .coveragerc_override
echo " def ,*_ray\(*" >> .coveragerc_override
echo " def .*_ray\(*" >> .coveragerc_override
echo " def ray_.*" >> .coveragerc_override
echo " def test_.*_ray*" >> .coveragerc_override
}

set_ray_coveragerc()
check_fftw_pyfftw()
{
# If `ray` command is not found then generate a .coveragerc_override file
if ! command -v ray &> /dev/null
if ! python -c "import pyfftw" &> /dev/null;
then
echo "Ray Not Installed"
echo "pyFFTW cannot be imported."
else
echo "pyFFTW can be imported!"
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
fi
}

gen_pyfftw_coveragerc()
{
gen_coveragerc_boilerplate
echo " class .*PYFFTW*" >> .coveragerc_override
echo " def test_.*pyfftw*" >> .coveragerc_override
}

set_coveragerc()
{
fcoveragerc=""

if ! command -v ray &> /dev/null;
then
echo "Ray not installed"
gen_ray_coveragerc
fcoveragerc="--rcfile=.coveragerc_override"
else
echo "Ray Installed"
echo "Ray installed"
fi

if ! command -v fftw-wisdom &> /dev/null \
|| ! python -c "import pyfftw" &> /dev/null;
then
echo "FFTW and/or pyFFTW not Installed"
gen_pyfftw_coveragerc
else
echo "FFTW and pyFFTW Installed"
fi

if [ -f .coveragerc_override ]; then
fcoveragerc="--rcfile=.coveragerc_override"
fi
}

show_coverage_report()
{
set_ray_coveragerc
set_coveragerc
coverage report --show-missing --fail-under=100 --skip-covered --omit=fastmath.py,docstring.py,versions.py $fcoveragerc
check_errs $?
}
Expand Down Expand Up @@ -361,6 +391,7 @@ check_print
check_pkg_imports
check_naive
check_ray
check_fftw_pyfftw


if [[ -z $NUMBA_DISABLE_JIT || $NUMBA_DISABLE_JIT -eq 0 ]]; then
Expand Down Expand Up @@ -405,4 +436,4 @@ else
test_coverage
fi

clean_up
clean_up
Loading
Loading