# Copyright 2022 Hitachi LTD. (Nelson Yalta)
#  Apache 2.0  (

"""ProDiff related loss module for ESPnet2."""

from math import exp
from typing import Tuple

import torch
from torch.nn import functional as F
from typeguard import check_argument_types

from espnet.nets.pytorch_backend.fastspeech.duration_predictor import (  # noqa: H301
from espnet.nets.pytorch_backend.nets_utils import make_non_pad_mask

[docs]def gaussian(window_size: int, sigma: float) -> torch.Tensor: """Gaussian Noise. Args: window_size (int): Window size. sigma (float): Noise sigma. Returns: torch.Tensor: Noise. """ gauss = torch.Tensor( [ exp(-((x - window_size // 2) ** 2) / float(2 * sigma**2)) for x in range(window_size) ] ) return gauss / gauss.sum()
[docs]class SSimLoss(torch.nn.Module): """SSimLoss. This is an implementation of structural similarity (SSIM) loss. This code is modified from """ def __init__( self, bias: float = 6.0, window_size: int = 11, channels: int = 1, reduction: str = "none", ): """Initialization. Args: bias (float, optional): value of the bias. Defaults to 6.0. window_size (int, optional): Window size. Defaults to 11. channels (int, optional): Number of channels. Defaults to 1. reduction (str, optional): Type of reduction during the loss calculation. Defaults to "none". """ super().__init__() self.bias = bias self.win_len = window_size self.channels = channels self.average = False if reduction == "mean": self.average = True win1d = gaussian(window_size, 1.5).unsqueeze(1) win2d = self.window = torch.Tensor( win2d.expand(channels, 1, window_size, window_size).contiguous() )
[docs] def forward(self, outputs: torch.Tensor, target: torch.Tensor): """Calculate forward propagation. Args: outputs (torch.Tensor): Batch of output sequences generated by the model (batch, time, mels). target (torch.Tensor): Batch of sequences with true states (batch, time, mels). Returns: Tensor: Loss scalar value. """ with torch.no_grad(): dim = target.size(-1) mask = target.abs().sum(-1, keepdim=True).ne(0).float().repeat(1, 1, dim) outputs = outputs.unsqueeze(1) + self.bias target = target.unsqueeze(1) + self.bias loss = 1 - self.ssim(outputs, target) loss = (loss * mask).sum() / mask.sum() return loss
[docs] def ssim(self, tensor1: torch.Tensor, tensor2: torch.Tensor): """Calculate SSIM loss. Args: tensor1 (torch.Tensor): Generated output. tensor2 (torch.Tensor): Groundtruth output. Returns: Tensor: Loss scalar value. """ window = mu1 = F.conv2d(tensor1, window, padding=self.win_len // 2, groups=self.channels) mu2 = F.conv2d(tensor2, window, padding=self.win_len // 2, groups=self.channels) mu_corr = mu1 * mu2 mu1 = mu1.pow(2) mu2 = mu2.pow(2) sigma1 = ( F.conv2d( tensor1 * tensor1, window, padding=self.win_len // 2, groups=self.channels, ) - mu1 ) sigma2 = ( F.conv2d( tensor2 * tensor2, window, padding=self.win_len // 2, groups=self.channels, ) - mu2 ) sigma_corr = ( F.conv2d( tensor1 * tensor2, window, padding=self.win_len // 2, groups=self.channels, ) - mu_corr ) C1 = 0.01**2 C2 = 0.03**2 ssim_map = ((2 * mu_corr + C1) * (2 * sigma_corr + C2)) / ( (mu1 + mu2 + C1) * (sigma1 + sigma2 + C2) ) if self.average: return ssim_map.mean() return ssim_map.mean(1)
[docs]class ProDiffLoss(torch.nn.Module): """Loss function module for ProDiffLoss.""" def __init__( self, use_masking: bool = True, use_weighted_masking: bool = False, ): """Initialize feed-forward Transformer loss module. Args: use_masking (bool): Whether to apply masking for padded part in loss calculation. use_weighted_masking (bool): Whether to weighted masking in loss calculation. """ assert check_argument_types() super().__init__() assert (use_masking != use_weighted_masking) or not use_masking self.use_masking = use_masking self.use_weighted_masking = use_weighted_masking # define criterions reduction = "none" if self.use_weighted_masking else "mean" self.l1_criterion = torch.nn.L1Loss(reduction=reduction) self.mse_criterion = torch.nn.MSELoss(reduction=reduction) self.duration_criterion = DurationPredictorLoss(reduction=reduction) self.ssim_criterion = SSimLoss(reduction="none")
[docs] def forward( self, after_outs: torch.Tensor, before_outs: torch.Tensor, d_outs: torch.Tensor, p_outs: torch.Tensor, e_outs: torch.Tensor, ys: torch.Tensor, ds: torch.Tensor, ps: torch.Tensor, es: torch.Tensor, ilens: torch.Tensor, olens: torch.Tensor, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """Calculate forward propagation. Args: after_outs (Tensor): Batch of outputs after postnets (B, T_feats, odim). before_outs (Tensor): Batch of outputs before postnets (B, T_feats, odim). d_outs (LongTensor): Batch of outputs of duration predictor (B, T_text). p_outs (Tensor): Batch of outputs of pitch predictor (B, T_text, 1). e_outs (Tensor): Batch of outputs of energy predictor (B, T_text, 1). ys (Tensor): Batch of target features (B, T_feats, odim). ds (LongTensor): Batch of durations (B, T_text). ps (Tensor): Batch of target token-averaged pitch (B, T_text, 1). es (Tensor): Batch of target token-averaged energy (B, T_text, 1). ilens (LongTensor): Batch of the lengths of each input (B,). olens (LongTensor): Batch of the lengths of each target (B,). Returns: Tensor: L1 loss value. Tensor: Duration predictor loss value. Tensor: Pitch predictor loss value. Tensor: Energy predictor loss value. """ # First SSIM before masks ssim_loss = self.ssim_criterion(before_outs, ys) # apply mask to remove padded part if self.use_masking: out_masks = make_non_pad_mask(olens).unsqueeze(-1).to(ys.device) before_outs = before_outs.masked_select(out_masks) if after_outs is not None: after_outs = after_outs.masked_select(out_masks) ys = ys.masked_select(out_masks) duration_masks = make_non_pad_mask(ilens).to(ys.device) d_outs = d_outs.masked_select(duration_masks) ds = ds.masked_select(duration_masks) pitch_masks = make_non_pad_mask(ilens).unsqueeze(-1).to(ys.device) p_outs = p_outs.masked_select(pitch_masks) e_outs = e_outs.masked_select(pitch_masks) ps = ps.masked_select(pitch_masks) es = es.masked_select(pitch_masks) # calculate loss l1_loss = self.l1_criterion(before_outs, ys) if after_outs is not None: l1_loss += self.l1_criterion(after_outs, ys) duration_loss = self.duration_criterion(d_outs, ds) pitch_loss = self.mse_criterion(p_outs, ps) energy_loss = self.mse_criterion(e_outs, es) # make weighted mask and apply it if self.use_weighted_masking: out_masks = make_non_pad_mask(olens).unsqueeze(-1).to(ys.device) out_weights = out_masks.float() / out_masks.sum(dim=1, keepdim=True).float() out_weights /= ys.size(0) * ys.size(2) duration_masks = make_non_pad_mask(ilens).to(ys.device) duration_weights = ( duration_masks.float() / duration_masks.sum(dim=1, keepdim=True).float() ) duration_weights /= ds.size(0) # apply weight l1_loss = l1_loss.mul(out_weights).masked_select(out_masks).sum() duration_loss = ( duration_loss.mul(duration_weights).masked_select(duration_masks).sum() ) pitch_masks = duration_masks.unsqueeze(-1) pitch_weights = duration_weights.unsqueeze(-1) pitch_loss = pitch_loss.mul(pitch_weights).masked_select(pitch_masks).sum() energy_loss = ( energy_loss.mul(pitch_weights).masked_select(pitch_masks).sum() ) return l1_loss, ssim_loss, duration_loss, pitch_loss, energy_loss