Source code for espnet2.fileio.sound_scp

import collections.abc
from pathlib import Path
from typing import List, Tuple, Union

import numpy as np
import soundfile
from typeguard import check_argument_types

from espnet2.fileio.read_text import read_2columns_text, read_multi_columns_text


[docs]def soundfile_read( wavs: Union[str, List[str]], dtype=None, always_2d: bool = False, concat_axis: int = 1, start: int = 0, end: int = None, return_subtype: bool = False, ) -> Tuple[np.array, int]: if isinstance(wavs, str): wavs = [wavs] arrays = [] subtypes = [] prev_rate = None prev_wav = None for wav in wavs: with soundfile.SoundFile(wav) as f: f.seek(start) if end is not None: frames = end - start else: frames = -1 if dtype == "float16": array = f.read( frames, dtype="float32", always_2d=always_2d, ).astype(dtype) else: array = f.read(frames, dtype=dtype, always_2d=always_2d) rate = f.samplerate subtype = f.subtype subtypes.append(subtype) if len(wavs) > 1 and array.ndim == 1 and concat_axis == 1: # array: (Time, Channel) array = array[:, None] if prev_wav is not None: if prev_rate != rate: raise RuntimeError( f"'{prev_wav}' and '{wav}' have mismatched sampling rate: " f"{prev_rate} != {rate}" ) dim1 = arrays[0].shape[1 - concat_axis] dim2 = array.shape[1 - concat_axis] if dim1 != dim2: raise RuntimeError( "Shapes must match with " f"{1 - concat_axis} axis, but gut {dim1} and {dim2}" ) prev_rate = rate prev_wav = wav arrays.append(array) if len(arrays) == 1: array = arrays[0] else: array = np.concatenate(arrays, axis=concat_axis) if return_subtype: return array, rate, subtypes else: return array, rate
[docs]class SoundScpReader(collections.abc.Mapping): """Reader class for 'wav.scp'. Examples: wav.scp is a text file that looks like the following: key1 /some/path/a.wav key2 /some/path/b.wav key3 /some/path/c.wav key4 /some/path/d.wav ... >>> reader = SoundScpReader('wav.scp') >>> rate, array = reader['key1'] If multi_columns=True is given and multiple files are given in one line with space delimiter, and the output array are concatenated along channel direction key1 /some/path/a.wav /some/path/a2.wav key2 /some/path/b.wav /some/path/b2.wav ... >>> reader = SoundScpReader('wav.scp', multi_columns=True) >>> rate, array = reader['key1'] In the above case, a.wav and a2.wav are concatenated. Note that even if multi_columns=True is given, SoundScpReader still supports a normal wav.scp, i.e., a wav file is given per line, but this option is disable by default because dict[str, list[str]] object is needed to be kept, but it increases the required amount of memory. """ def __init__( self, fname, dtype=None, always_2d: bool = False, multi_columns: bool = False, concat_axis=1, ): assert check_argument_types() self.fname = fname self.dtype = dtype self.always_2d = always_2d if multi_columns: self.data, _ = read_multi_columns_text(fname) else: self.data = read_2columns_text(fname) self.multi_columns = multi_columns self.concat_axis = concat_axis def __getitem__(self, key) -> Tuple[int, np.ndarray]: wavs = self.data[key] array, rate = soundfile_read( wavs, dtype=self.dtype, always_2d=self.always_2d, concat_axis=self.concat_axis, ) # Returned as scipy.io.wavread's order return rate, array
[docs] def get_path(self, key): return self.data[key]
def __contains__(self, item): return item def __len__(self): return len(self.data) def __iter__(self): return iter(self.data)
[docs] def keys(self): return self.data.keys()
[docs]class SoundScpWriter: """Writer class for 'wav.scp' Args: outdir: scpfile: format: The output audio format multi_columns: Save multi channel data as multiple monaural audio files output_name_format: The naming formam of generated audio files output_name_format_multi_columns: The naming formam of generated audio files when multi_columns is given dtype: subtype: Examples: >>> writer = SoundScpWriter('./data/', './data/wav.scp') >>> writer['aa'] = 16000, numpy_array >>> writer['bb'] = 16000, numpy_array aa ./data/aa.wav bb ./data/bb.wav >>> writer = SoundScpWriter( './data/', './data/feat.scp', multi_columns=True, ) >>> numpy_array.shape (100, 2) >>> writer['aa'] = 16000, numpy_array aa ./data/aa-CH0.wav ./data/aa-CH1.wav """ def __init__( self, outdir: Union[Path, str], scpfile: Union[Path, str], format="wav", multi_columns: bool = False, output_name_format: str = "{key}.{audio_format}", output_name_format_multi_columns: str = "{key}-CH{channel}.{audio_format}", subtype: str = None, ): assert check_argument_types() self.dir = Path(outdir) self.dir.mkdir(parents=True, exist_ok=True) scpfile = Path(scpfile) scpfile.parent.mkdir(parents=True, exist_ok=True) self.fscp = scpfile.open("w", encoding="utf-8") self.format = format self.subtype = subtype self.output_name_format = output_name_format self.multi_columns = multi_columns self.output_name_format_multi_columns = output_name_format_multi_columns self.data = {} def __setitem__( self, key: str, value: Union[Tuple[int, np.ndarray], Tuple[np.ndarray, int]] ): value = list(value) if len(value) != 2: raise ValueError(f"Expecting 2 elements, but got {len(value)}") if isinstance(value[0], int) and isinstance(value[1], np.ndarray): rate, signal = value elif isinstance(value[1], int) and isinstance(value[0], np.ndarray): signal, rate = value else: raise TypeError("value shoulbe be a tuple of int and numpy.ndarray") if signal.ndim not in (1, 2): raise RuntimeError(f"Input signal must be 1 or 2 dimension: {signal.ndim}") if signal.ndim == 1: signal = signal[:, None] if signal.shape[1] > 1 and self.multi_columns: wavs = [] for channel in range(signal.shape[1]): wav = self.dir / self.output_name_format_multi_columns.format( key=key, audio_format=self.format, channel=channel ) wav.parent.mkdir(parents=True, exist_ok=True) wav = str(wav) soundfile.write(wav, signal[:, channel], rate, subtype=self.subtype) wavs.append(wav) self.fscp.write(f"{key} {' '.join(wavs)}\n") # Store the file path self.data[key] = wavs else: wav = self.dir / self.output_name_format.format( key=key, audio_format=self.format ) wav.parent.mkdir(parents=True, exist_ok=True) wav = str(wav) soundfile.write(wav, signal, rate, subtype=self.subtype) self.fscp.write(f"{key} {wav}\n") # Store the file path self.data[key] = wav
[docs] def get_path(self, key): return self.data[key]
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.fscp.close()