# Implementation of the TCN proposed in
# Luo. et al. "Conv-tasnet: Surpassing ideal time–frequency
# magnitude masking for speech separation."
#
# The code is based on:
# https://github.com/kaituoxu/Conv-TasNet/blob/master/src/conv_tasnet.py
#
import torch
import torch.nn as nn
EPS = torch.finfo(torch.get_default_dtype()).eps
[docs]class TemporalConvNet(nn.Module):
def __init__(self, N, B, H, P, X, R, norm_type="gLN", causal=False):
"""Basic Module of tasnet.
Args:
N: Number of filters in autoencoder
B: Number of channels in bottleneck 1 * 1-conv block
H: Number of channels in convolutional blocks
P: Kernel size in convolutional blocks
X: Number of convolutional blocks in each repeat
R: Number of repeats
norm_type: BN, gLN, cLN
causal: causal or non-causal
"""
super().__init__()
# Components
# [M, N, K] -> [M, N, K]
layer_norm = ChannelwiseLayerNorm(N)
# [M, N, K] -> [M, B, K]
bottleneck_conv1x1 = nn.Conv1d(N, B, 1, bias=False)
# [M, B, K] -> [M, B, K]
repeats = []
for r in range(R):
blocks = []
for x in range(X):
dilation = 2**x
padding = (P - 1) * dilation if causal else (P - 1) * dilation // 2
blocks += [
TemporalBlock(
B,
H,
P,
stride=1,
padding=padding,
dilation=dilation,
norm_type=norm_type,
causal=causal,
)
]
repeats += [nn.Sequential(*blocks)]
temporal_conv_net = nn.Sequential(*repeats)
# Put together (except mask_conv1x1, modified from the original code)
self.network = nn.Sequential(layer_norm, bottleneck_conv1x1, temporal_conv_net)
[docs] def forward(self, mixture_w):
"""Keep this API same with TasNet.
Args:
mixture_w: [M, N, K], M is batch size
Returns:
bottleneck_feature: [M, B, K]
"""
return self.network(mixture_w) # [M, N, K] -> [M, B, K]
[docs]class TemporalBlock(nn.Module):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
norm_type="gLN",
causal=False,
):
super().__init__()
# [M, B, K] -> [M, H, K]
conv1x1 = nn.Conv1d(in_channels, out_channels, 1, bias=False)
prelu = nn.PReLU()
norm = chose_norm(norm_type, out_channels)
# [M, H, K] -> [M, B, K]
dsconv = DepthwiseSeparableConv(
out_channels,
in_channels,
kernel_size,
stride,
padding,
dilation,
norm_type,
causal,
)
# Put together
self.net = nn.Sequential(conv1x1, prelu, norm, dsconv)
[docs] def forward(self, x):
"""Forward.
Args:
x: [M, B, K]
Returns:
[M, B, K]
"""
residual = x
out = self.net(x)
# TODO(Jing): when P = 3 here works fine, but when P = 2 maybe need to pad?
return out + residual # look like w/o F.relu is better than w/ F.relu
# return F.relu(out + residual)
[docs]class DepthwiseSeparableConv(nn.Module):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
norm_type="gLN",
causal=False,
):
super().__init__()
# Use `groups` option to implement depthwise convolution
# [M, H, K] -> [M, H, K]
depthwise_conv = nn.Conv1d(
in_channels,
in_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=in_channels,
bias=False,
)
if causal:
chomp = Chomp1d(padding)
prelu = nn.PReLU()
norm = chose_norm(norm_type, in_channels)
# [M, H, K] -> [M, B, K]
pointwise_conv = nn.Conv1d(in_channels, out_channels, 1, bias=False)
# Put together
if causal:
self.net = nn.Sequential(depthwise_conv, chomp, prelu, norm, pointwise_conv)
else:
self.net = nn.Sequential(depthwise_conv, prelu, norm, pointwise_conv)
[docs] def forward(self, x):
"""Forward.
Args:
x: [M, H, K]
Returns:
result: [M, B, K]
"""
return self.net(x)
[docs]class Chomp1d(nn.Module):
"""To ensure the output length is the same as the input."""
def __init__(self, chomp_size):
super().__init__()
self.chomp_size = chomp_size
[docs] def forward(self, x):
"""Forward.
Args:
x: [M, H, Kpad]
Returns:
[M, H, K]
"""
return x[:, :, : -self.chomp_size].contiguous()
[docs]def check_nonlinear(nolinear_type):
if nolinear_type not in ["softmax", "relu"]:
raise ValueError("Unsupported nonlinear type")
[docs]def chose_norm(norm_type, channel_size):
"""The input of normalization will be (M, C, K), where M is batch size.
C is channel size and K is sequence length.
"""
if norm_type == "gLN":
return GlobalLayerNorm(channel_size)
elif norm_type == "cLN":
return ChannelwiseLayerNorm(channel_size)
elif norm_type == "BN":
# Given input (M, C, K), nn.BatchNorm1d(C) will accumulate statics
# along M and K, so this BN usage is right.
return nn.BatchNorm1d(channel_size)
else:
raise ValueError("Unsupported normalization type")
[docs]class ChannelwiseLayerNorm(nn.Module):
"""Channel-wise Layer Normalization (cLN)."""
def __init__(self, channel_size):
super().__init__()
self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.reset_parameters()
[docs] def reset_parameters(self):
self.gamma.data.fill_(1)
self.beta.data.zero_()
[docs] def forward(self, y):
"""Forward.
Args:
y: [M, N, K], M is batch size, N is channel size, K is length
Returns:
cLN_y: [M, N, K]
"""
mean = torch.mean(y, dim=1, keepdim=True) # [M, 1, K]
var = torch.var(y, dim=1, keepdim=True, unbiased=False) # [M, 1, K]
cLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta
return cLN_y
[docs]class GlobalLayerNorm(nn.Module):
"""Global Layer Normalization (gLN)."""
def __init__(self, channel_size):
super().__init__()
self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.reset_parameters()
[docs] def reset_parameters(self):
self.gamma.data.fill_(1)
self.beta.data.zero_()
[docs] def forward(self, y):
"""Forward.
Args:
y: [M, N, K], M is batch size, N is channel size, K is length
Returns:
gLN_y: [M, N, K]
"""
mean = y.mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) # [M, 1, 1]
var = (
(torch.pow(y - mean, 2)).mean(dim=1, keepdim=True).mean(dim=2, keepdim=True)
)
gLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta
return gLN_y