-
Notifications
You must be signed in to change notification settings - Fork 2
Description
See the reference OTFFT
The goal of this issue is to determine whether it is worth it to use our implementation of 6-step FFT instead of numpy/scipy's FFT. Some works were done in stumpy-dev/stumpy#967. To demonstrate the performance, I measured its running time and compared it agains sciPy.fft.rfft. The result is shown below. Any data point above line y==1 means that the implemented 6-step FFT is faster.
As observed, the SciPy is outperformed by the implemented 6-step RFFT for len(T) <= 2^10. However, after this length, SciPy's rfft shows better performance. We noticed that sliding dot product with njit is faster than fft approach for len(Q) <= 2 ^ 7. And for len(Q) between 2^8 and 2^10, the implemented RFFT is faster than Scipy only 2x. Not that 2x is bad but this comes with the cost of maintaining our FFT. So, long story short, it might be better to put this on hold for now.
6-step FFT
import math
import numpy as np
from numba import njit
@njit(fastmath=True)
def _fft_block(s, eo, x, y, c, sm):
"""
A recursive function that is used as part of fft algorithm
n : int
s : int
eo: bool
x : numpy.array 1D
y : numpy.array 1D
"""
if s == sm:
if eo:
z = y
else:
z = x
for i in range(s):
j = i + s
a = x[i]
b = x[j]
z[i] = a + b
z[j] = a - b
elif s == 1:
w = 1.0
for idx in range(sm):
a = x[idx]
b = x[idx + sm]
y[2 * idx] = a + b
y[2 * idx + 1] = (a - b) * w
w = w * c
_fft_block(2, True, y, x, c * c, sm)
elif s < sm:
w = 1.0
for i in range(0, sm, s):
for j in range(i, i + s):
a = x[j]
b = x[j + sm]
y[j + i] = a + b
y[j + i + s] = (a - b) * w
w = w * c
_fft_block(2 * s, not eo, y, x, c * c, sm)
else:
pass
@njit(fastmath=True)
def _tranpose_v1(x, n_sqrt, x_transpose):
blocksize = 32
blocksize = min(blocksize, n_sqrt)
x = x.reshape(n_sqrt, n_sqrt)
x_transpose = x_transpose.reshape(n_sqrt, n_sqrt)
for i in range(0, n_sqrt, blocksize):
for j in range(0, n_sqrt, blocksize):
x_transpose[i:i + blocksize, j:j + blocksize] = np.transpose(x[j:j + blocksize, i:i + blocksize])
return
@njit(fastmath=True)
def _eightstep_fft(x, y):
"""
Apply 8-step FFT algorithm and update x in-place.
"""
n = len(x)
m = n // 2
theta = math.pi / m
factor = math.cos(theta) - 1j * math.sin(theta)
w = 1.0
for i in range(m):
j = i + m
y[i] = x[i] + x[j]
y[j] = (x[i] - x[j]) * w
w = w * factor
_sixstep_fft(y[:m], x[:m])
_sixstep_fft(y[m:], x[m:])
for p in range(m):
x[2 * p] = y[p]
x[2 * p + 1] = y[p + m]
return
@njit(fastmath=True)
def _sixstep_fft(x, y):
"""
Apply 6-step FFT algorithm and update x in-place.
"""
n = len(x)
n_sqrt = int(np.sqrt(n))
sm = n_sqrt // 2
x = x.reshape(n_sqrt, n_sqrt)
y = y.reshape(n_sqrt, n_sqrt)
theta = 2 * math.pi / n_sqrt
c_theta = math.cos(theta) - 1j * math.sin(theta)
# step 1: matrix transpose
blocksize = 32
blocksize = min(blocksize, n_sqrt)
for i in range(0, n_sqrt, blocksize):
for j in range(0, n_sqrt, blocksize):
y[i:i + blocksize, j:j + blocksize] = np.transpose(x[j:j + blocksize, i:i + blocksize])
# step 2
for i in range(n_sqrt):
_fft_block(1, False, y[i], x[i], c_theta, sm)
# step 3 and 4: tranpose with twiddle_factor
wp = 1.0
cp = 1.0
theta = 2 * math.pi / n
factor = math.cos(theta) - 1j * math.sin(theta)
for p in range(n_sqrt):
c = cp
w = wp
y[p, p] = y[p, p] * w
for q in range(p + 1, n_sqrt):
w = w * c
y[q, p], y[p, q] = y[p, q] * w, y[q, p] * w
cp_new = factor * cp
wp = wp * cp_new * cp
cp = cp_new
# step 5
for i in range(n_sqrt):
_fft_block(1, False, y[i], x[i], c_theta, sm)
# step 6: matrix transpose
for i in range(0, n_sqrt, blocksize):
for j in range(0, n_sqrt, blocksize):
x[i:i + blocksize, j:j + blocksize] = np.transpose(y[j:j + blocksize, i:i + blocksize])
return
@njit(fastmath=True)
def _compute_fft(x, y):
n = len(x)
n_logtwo = int(np.log2(n))
if n_logtwo == 1:
a = x[0]
b = x[1]
x[0] = a + b
x[1] = a - b
elif n_logtwo % 2 == 0:
_sixstep_fft(x, y)
else:
_eightstep_fft(x, y)
return
@njit(fastmath=True)
def _compute_rfft(T):
n = len(T)
half_n = n // 2
y = np.empty(half_n + 1, dtype=np.complex128)
if half_n == 2: # special case
y[0] = T[0] + T[1] + T[2] + T[3]
y[1] = (T[0] - T[2]) - 1j * (T[1] - T[3])
y[2] = T[0] + T[2] - T[1] - T[3]
else:
x = np.empty(half_n, dtype=np.complex128)
for i in range(half_n):
x[i] = T[2 * i] + 1j * T[2 * i + 1]
_compute_fft(x, y[:half_n])
y[0] = x[0].real + x[0].imag
y[n // 4] = x[n // 4].conjugate()
y[half_n] = x[0].real - x[0].imag
w = 0.5j
factor = math.cos(math.pi / half_n) - 1j * math.sin(math.pi / half_n)
for k in range(1, n//4):
w = w * factor
v = (x[k] - x[half_n - k].conjugate()) * (0.5 + w)
y[k] = x[k] - v
y[half_n - k] = x[half_n - k] + v.conjugate()
return y
What do you think @seanlaw?