Skip to content
Open
189 changes: 189 additions & 0 deletions stumpy/sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from . import config

try: # pragma: no cover
import pyfftw

FFTW_IS_AVAILABLE = True
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
except ImportError: # pragma: no cover
FFTW_IS_AVAILABLE = False


@njit(fastmath=config.STUMPY_FASTMATH_TRUE)
def _njit_sliding_dot_product(Q, T):
Expand Down Expand Up @@ -109,6 +116,188 @@ def _pocketfft_sliding_dot_product(Q, T):
return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n]


class _PYFFTW_SLIDING_DOT_PRODUCT:
"""
A class to compute the sliding dot product using FFTW via pyfftw.

This class uses FFTW (via pyfftw) to efficiently compute the sliding dot product
between a query sequence Q and a time series T. It preallocates arrays and caches
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
FFTW objects to optimize repeated computations with similar-sized inputs.

Parameters
----------
max_n : int, default=2**20
Maximum length to preallocate arrays for. This will be the size of the
real-valued array. A complex-valued array of size `1 + (max_n // 2)`
will also be preallocated. If inputs exceed this size, arrays will be
reallocated to accommodate larger sizes.

Attributes
----------
real_arr : pyfftw.empty_aligned
Preallocated real-valued array for FFTW computations.

complex_arr : pyfftw.empty_aligned
Preallocated complex-valued array for FFTW computations.

rfft_objects : dict
Cache of FFTW forward transform objects, keyed by
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
(next_fast_n, n_threads, planning_flag).

irfft_objects : dict
Cache of FFTW inverse transform objects, keyed by
(next_fast_n, n_threads, planning_flag).

Notes
-----
The class maintains internal caches of FFTW objects to avoid redundant planning
operations when called multiple times with similar-sized inputs and parameters.
Comment thread
NimaSarajpoor marked this conversation as resolved.

Examples
--------
>>> sdp_obj = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=1000)
>>> Q = np.array([1, 2, 3])
>>> T = np.array([4, 5, 6, 7, 8])
>>> result = sdp_obj(Q, T)

References
----------
`FFTW documentation <http://www.fftw.org/>`__

`pyfftw documentation <https://pyfftw.readthedocs.io/>`__
"""

def __init__(self, max_n=2**20):
"""
Initialize the `_PYFFTW_SLIDING_DOT_PRODUCT` object, which can be called
to compute the sliding dot product using FFTW via pyfftw.

Parameters
----------
max_n : int, default=2**20
Maximum length to preallocate arrays for. This will be the size of the
real-valued array. A complex-valued array of size `1 + (max_n // 2)`
will also be preallocated.

Returns
-------
None
"""
# Preallocate arrays
self.real_arr = pyfftw.empty_aligned(max_n, dtype="float64")
self.complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype="complex128")

# Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag)
self.rfft_objects = {}
self.irfft_objects = {}

def __call__(self, Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"):
"""
Compute the sliding dot product between `Q` and `T` using FFTW via pyfftw,
and cache FFTW objects if not already cached.

Parameters
----------
Q : numpy.ndarray
Query array or subsequence.

T : numpy.ndarray
Time series or sequence.

n_threads : int, default=1
Number of threads to use for FFTW computations.

planning_flag : str, default="FFTW_ESTIMATE"
The planning flag that will be used in FFTW for planning.
See pyfftw documentation for details. Current options, ordered
ascendingly by the level of aggressiveness in planning, are:
"FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE".
The more aggressive the planning, the longer the planning time, but
the faster the execution time.

Returns
-------
out : numpy.ndarray
Sliding dot product between `Q` and `T`.

Notes
-----
The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with
MATLAB's FFTW usage (as of version R2025b)
See: https://www.mathworks.com/help/matlab/ref/fftw.html

This implementation is inspired by the answer on StackOverflow:
https://stackoverflow.com/a/30615425/2955541
"""
m = Q.shape[0]
n = T.shape[0]
next_fast_n = pyfftw.next_fast_len(n)

# Update preallocated arrays if needed
if next_fast_n > len(self.real_arr):
self.real_arr = pyfftw.empty_aligned(next_fast_n, dtype="float64")
self.complex_arr = pyfftw.empty_aligned(
1 + (next_fast_n // 2), dtype="complex128"
)

real_arr = self.real_arr[:next_fast_n]
complex_arr = self.complex_arr[: 1 + (next_fast_n // 2)]

# Get or create FFTW objects
key = (next_fast_n, n_threads, planning_flag)

rfft_obj = self.rfft_objects.get(key, None)
Comment thread
NimaSarajpoor marked this conversation as resolved.
if rfft_obj is None:
rfft_obj = pyfftw.FFTW(
input_array=real_arr,
output_array=complex_arr,
direction="FFTW_FORWARD",
flags=(planning_flag,),
threads=n_threads,
)
self.rfft_objects[key] = rfft_obj
else:
rfft_obj.update_arrays(real_arr, complex_arr)
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated

irfft_obj = self.irfft_objects.get(key, None)
if irfft_obj is None:
irfft_obj = pyfftw.FFTW(
input_array=complex_arr,
output_array=real_arr,
direction="FFTW_BACKWARD",
flags=(planning_flag, "FFTW_DESTROY_INPUT"),
threads=n_threads,
)
self.irfft_objects[key] = irfft_obj
else:
irfft_obj.update_arrays(complex_arr, real_arr)

# RFFT(T)
real_arr[:n] = T
real_arr[n:] = 0.0
rfft_obj.execute() # output is in complex_arr
complex_arr_T = complex_arr.copy()
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated

# RFFT(Q)
# Scale by 1/next_fast_n to account for
# FFTW's unnormalized inverse FFT via execute()
np.multiply(Q[::-1], 1.0 / next_fast_n, out=real_arr[:m])
real_arr[m:] = 0.0
rfft_obj.execute() # output is in complex_arr

# RFFT(T) * RFFT(Q)
np.multiply(complex_arr, complex_arr_T, out=complex_arr)
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated

# IRFFT (input is in complex_arr)
irfft_obj.execute() # output is in real_arr

return real_arr[m - 1 : n]


if FFTW_IS_AVAILABLE: # pragma: no cover
_pyfftw_sliding_dot_product = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**20)


def _sliding_dot_product(Q, T):
"""
Compute the sliding dot product between `Q` and `T`
Expand Down
50 changes: 41 additions & 9 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,58 @@ gen_ray_coveragerc()
# Generate a .coveragerc_override file that excludes Ray functions and tests
gen_coveragerc_boilerplate
echo " def .*_ray_*" >> .coveragerc_override
echo " def ,*_ray\(*" >> .coveragerc_override
echo " def .*_ray\(*" >> .coveragerc_override
echo " def ray_.*" >> .coveragerc_override
echo " def test_.*_ray*" >> .coveragerc_override
}

set_ray_coveragerc()
check_fftw_pyfftw()
{
# If `ray` command is not found then generate a .coveragerc_override file
if ! command -v ray &> /dev/null
if ! command -v fftw-wisdom &> /dev/null \
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
|| ! python -c "import pyfftw" &> /dev/null;
then
echo "Ray Not Installed"
echo "FFTW and/or pyFFTW Not Installed"
else
echo "FFTW and pyFFTW Installed"
fi
}

gen_pyfftw_coveragerc()
{
gen_coveragerc_boilerplate
echo " class .*PYFFTW*" >> .coveragerc_override
echo " def test_.*pyfftw*" >> .coveragerc_override
}

set_coveragerc()
{
fcoveragerc=""

if ! command -v ray &> /dev/null;
then
echo "Ray not installed"
gen_ray_coveragerc
fcoveragerc="--rcfile=.coveragerc_override"
else
echo "Ray Installed"
echo "Ray installed"
fi

if ! command -v fftw-wisdom &> /dev/null \
|| ! python -c "import pyfftw" &> /dev/null;
then
echo "FFTW and/or pyFFTW not Installed"
gen_pyfftw_coveragerc
else
echo "FFTW and pyFFTW Installed"
fi

if [ -f .coveragerc_override ]; then
fcoveragerc="--rcfile=.coveragerc_override"
fi
}

show_coverage_report()
{
set_ray_coveragerc
set_coveragerc
coverage report --show-missing --fail-under=100 --skip-covered --omit=fastmath.py,docstring.py,versions.py $fcoveragerc
check_errs $?
}
Expand Down Expand Up @@ -361,6 +392,7 @@ check_print
check_pkg_imports
check_naive
check_ray
check_fftw_pyfftw


if [[ -z $NUMBA_DISABLE_JIT || $NUMBA_DISABLE_JIT -eq 0 ]]; then
Expand Down Expand Up @@ -405,4 +437,4 @@ else
test_coverage
fi

clean_up
clean_up
25 changes: 25 additions & 0 deletions tests/test_sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def get_sdp_function_names():
if func_name.endswith("sliding_dot_product"):
out.append(func_name)

if sdp.FFTW_IS_AVAILABLE: # pragma: no cover
Comment thread
NimaSarajpoor marked this conversation as resolved.
Outdated
out.append("_pyfftw_sliding_dot_product")

return out


Expand Down Expand Up @@ -152,3 +155,25 @@ def test_sdp_power2():
raise e

return


def test_pyfftw_sdp_max_n():
if not sdp.FFTW_IS_AVAILABLE: # pragma: no cover
pytest.skip("Skipping Test pyFFTW Not Installed")

# When `len(T)` larger than `max_n` in pyfftw_sdp,
# the internal preallocated arrays should be resized.
# This test checks that functionality.
max_n = 2**10
sdp_func = sdp._PYFFTW_SLIDING_DOT_PRODUCT(max_n)

# len(T) > max_n to trigger array resizing
T = np.random.rand(max_n + 1)
Q = np.random.rand(2**8)

comp = sdp_func(Q, T)
ref = naive.rolling_window_dot_product(Q, T)

np.testing.assert_allclose(comp, ref)

return