from typing import Optional, Tuple, Union
import librosa
import numpy as np
import torch
from packaging.version import parse as V
from torch_complex.tensor import ComplexTensor
from typeguard import check_argument_types
from espnet2.enh.layers.complex_utils import is_complex
from espnet2.layers.inversible_interface import InversibleInterface
from espnet.nets.pytorch_backend.nets_utils import make_pad_mask
is_torch_1_10_plus = V(torch.__version__) >= V("1.10.0")
is_torch_1_9_plus = V(torch.__version__) >= V("1.9.0")
is_torch_1_7_plus = V(torch.__version__) >= V("1.7")
[docs]class Stft(torch.nn.Module, InversibleInterface):
def __init__(
self,
n_fft: int = 512,
win_length: int = None,
hop_length: int = 128,
window: Optional[str] = "hann",
center: bool = True,
normalized: bool = False,
onesided: bool = True,
):
assert check_argument_types()
super().__init__()
self.n_fft = n_fft
if win_length is None:
self.win_length = n_fft
else:
self.win_length = win_length
self.hop_length = hop_length
self.center = center
self.normalized = normalized
self.onesided = onesided
if window is not None and not hasattr(torch, f"{window}_window"):
raise ValueError(f"{window} window is not implemented")
self.window = window
[docs] def forward(
self, input: torch.Tensor, ilens: torch.Tensor = None
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""STFT forward function.
Args:
input: (Batch, Nsamples) or (Batch, Nsample, Channels)
ilens: (Batch)
Returns:
output: (Batch, Frames, Freq, 2) or (Batch, Frames, Channels, Freq, 2)
"""
bs = input.size(0)
if input.dim() == 3:
multi_channel = True
# input: (Batch, Nsample, Channels) -> (Batch * Channels, Nsample)
input = input.transpose(1, 2).reshape(-1, input.size(1))
else:
multi_channel = False
# NOTE(kamo):
# The default behaviour of torch.stft is compatible with librosa.stft
# about padding and scaling.
# Note that it's different from scipy.signal.stft
# output: (Batch, Freq, Frames, 2=real_imag)
# or (Batch, Channel, Freq, Frames, 2=real_imag)
if self.window is not None:
window_func = getattr(torch, f"{self.window}_window")
window = window_func(
self.win_length, dtype=input.dtype, device=input.device
)
else:
window = None
# For the compatibility of ARM devices, which do not support
# torch.stft() due to the lack of MKL (on older pytorch versions),
# there is an alternative replacement implementation with librosa.
# Note: pytorch >= 1.10.0 now has native support for FFT and STFT
# on all cpu targets including ARM.
if is_torch_1_10_plus or input.is_cuda or torch.backends.mkl.is_available():
stft_kwargs = dict(
n_fft=self.n_fft,
win_length=self.win_length,
hop_length=self.hop_length,
center=self.center,
window=window,
normalized=self.normalized,
onesided=self.onesided,
)
if is_torch_1_7_plus:
stft_kwargs["return_complex"] = False
output = torch.stft(input, **stft_kwargs)
else:
if self.training:
raise NotImplementedError(
"stft is implemented with librosa on this device, which does not "
"support the training mode."
)
# use stft_kwargs to flexibly control different PyTorch versions' kwargs
# note: librosa does not support a win_length that is < n_ftt
# but the window can be manually padded (see below).
stft_kwargs = dict(
n_fft=self.n_fft,
win_length=self.n_fft,
hop_length=self.hop_length,
center=self.center,
window=window,
pad_mode="reflect",
)
if window is not None:
# pad the given window to n_fft
n_pad_left = (self.n_fft - window.shape[0]) // 2
n_pad_right = self.n_fft - window.shape[0] - n_pad_left
stft_kwargs["window"] = torch.cat(
[torch.zeros(n_pad_left), window, torch.zeros(n_pad_right)], 0
).numpy()
else:
win_length = (
self.win_length if self.win_length is not None else self.n_fft
)
stft_kwargs["window"] = torch.ones(win_length)
output = []
# iterate over istances in a batch
for i, instance in enumerate(input):
stft = librosa.stft(input[i].numpy(), **stft_kwargs)
output.append(torch.tensor(np.stack([stft.real, stft.imag], -1)))
output = torch.stack(output, 0)
if not self.onesided:
len_conj = self.n_fft - output.shape[1]
conj = output[:, 1 : 1 + len_conj].flip(1)
conj[:, :, :, -1].data *= -1
output = torch.cat([output, conj], 1)
if self.normalized:
output = output * (stft_kwargs["window"].shape[0] ** (-0.5))
# output: (Batch, Freq, Frames, 2=real_imag)
# -> (Batch, Frames, Freq, 2=real_imag)
output = output.transpose(1, 2)
if multi_channel:
# output: (Batch * Channel, Frames, Freq, 2=real_imag)
# -> (Batch, Frame, Channel, Freq, 2=real_imag)
output = output.view(bs, -1, output.size(1), output.size(2), 2).transpose(
1, 2
)
if ilens is not None:
if self.center:
pad = self.n_fft // 2
ilens = ilens + 2 * pad
if is_torch_1_9_plus:
olens = (
torch.div(
ilens - self.n_fft, self.hop_length, rounding_mode="trunc"
)
+ 1
)
else:
olens = (ilens - self.n_fft) // self.hop_length + 1
output.masked_fill_(make_pad_mask(olens, output, 1), 0.0)
else:
olens = None
return output, olens
[docs] def inverse(
self, input: Union[torch.Tensor, ComplexTensor], ilens: torch.Tensor = None
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""Inverse STFT.
Args:
input: Tensor(batch, T, F, 2) or ComplexTensor(batch, T, F)
ilens: (batch,)
Returns:
wavs: (batch, samples)
ilens: (batch,)
"""
if V(torch.__version__) >= V("1.6.0"):
istft = torch.functional.istft
else:
try:
import torchaudio
except ImportError:
raise ImportError(
"Please install torchaudio>=0.3.0 or use torch>=1.6.0"
)
if not hasattr(torchaudio.functional, "istft"):
raise ImportError(
"Please install torchaudio>=0.3.0 or use torch>=1.6.0"
)
istft = torchaudio.functional.istft
if self.window is not None:
window_func = getattr(torch, f"{self.window}_window")
if is_complex(input):
datatype = input.real.dtype
else:
datatype = input.dtype
window = window_func(self.win_length, dtype=datatype, device=input.device)
else:
window = None
if is_complex(input):
input = torch.stack([input.real, input.imag], dim=-1)
elif input.shape[-1] != 2:
raise TypeError("Invalid input type")
input = input.transpose(1, 2)
wavs = istft(
input,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=self.win_length,
window=window,
center=self.center,
normalized=self.normalized,
onesided=self.onesided,
length=ilens.max() if ilens is not None else ilens,
)
return wavs, ilens