Index: webrtc/modules/audio_processing/test/py_quality_assessment/quality_assessment/signal_processing.py |
diff --git a/webrtc/modules/audio_processing/test/py_quality_assessment/quality_assessment/signal_processing.py b/webrtc/modules/audio_processing/test/py_quality_assessment/quality_assessment/signal_processing.py |
index 10fd910cfe5cf2ac33f0c1ae6c4c1b38b236cee7..1dbff3d581f0068fdebadf1b8bb6cd9fdef94f57 100644 |
--- a/webrtc/modules/audio_processing/test/py_quality_assessment/quality_assessment/signal_processing.py |
+++ b/webrtc/modules/audio_processing/test/py_quality_assessment/quality_assessment/signal_processing.py |
@@ -11,8 +11,13 @@ import logging |
import numpy as np |
import pydub |
+import pydub.generators |
import scipy.signal |
+class SignalProcessingException(Exception): |
+ pass |
+ |
+ |
class SignalProcessingUtils(object): |
def __init__(self): |
@@ -110,39 +115,72 @@ class SignalProcessingUtils(object): |
return signal.apply_gain(-signal.max_dBFS) |
@classmethod |
- def mix_signals(cls, signal_0, signal_1, target_snr=0.0, |
+ def copy(cls, signal): |
+ return pydub.AudioSegment( |
+ data=signal.get_array_of_samples(), |
+ metadata={ |
+ 'sample_width': signal.sample_width, |
+ 'frame_rate': signal.frame_rate, |
+ 'frame_width': signal.frame_width, |
+ 'channels': signal.channels, |
+ }) |
+ |
+ @classmethod |
+ def mix_signals(cls, signal, noise, target_snr=0.0, |
bln_pad_shortest=False): |
""" |
- Mix two signals up to a desired SNR by scaling signal_0 (signal). |
+ Mix two signals up to a desired SNR by scaling noise (noise). |
+ If the target SNR is +/- infinite, a copy of signal/noise is returned. |
Args: |
- signal_0: AudioSegment instance (signal). |
- signal_1: AudioSegment instance (noise). |
- target_snr: float (dB). |
+ signal: AudioSegment instance (signal). |
+ noise: AudioSegment instance (noise). |
+ target_snr: float, numpy.Inf or -numpy.Inf (dB). |
bln_pad_shortest: if True, it pads the shortest signal with silence at the |
end. |
""" |
- # Pad signal_1 (if necessary). If signal_0 is the shortest, the AudioSegment |
- # overlay() method implictly pads signal_0. Hence, the only case to handle |
- # is signal_1 shorter than signal_0 and bln_pad_shortest True. |
+ # Handle infinite target SNR. |
+ if target_snr == -np.Inf: |
+ # Return a copy of noise. |
+ logging.warning('SNR = -Inf, returning noise') |
+ return cls.copy(noise) |
+ elif target_snr == np.Inf: |
+ # Return a copy of signal. |
+ logging.warning('SNR = +Inf, returning signal') |
+ return cls.copy(signal) |
+ |
+ # Check signal and noise power. |
+ signal_power = float(signal.dBFS) |
+ noise_power = float(noise.dBFS) |
+ if signal_power == -np.Inf: |
+ logging.error('signal has -Inf power, cannot mix') |
+ raise SignalProcessingException('cannot mix a signal with -Inf power') |
+ if noise_power == -np.Inf: |
+ logging.error('noise has -Inf power, cannot mix') |
+ raise SignalProcessingException('cannot mix a signal with -Inf power') |
+ |
+ # Pad signal (if necessary). If noise is the shortest, the AudioSegment |
+ # overlay() method implictly pads noise. Hence, the only case to handle |
+ # is signal shorter than noise and bln_pad_shortest True. |
if bln_pad_shortest: |
- signal_0_duration = len(signal_0) |
- signal_1_duration = len(signal_1) |
- logging.debug('mix signals with padding') |
- logging.debug(' signal_0: %d ms', signal_0_duration) |
- logging.debug(' signal_1: %d ms', signal_1_duration) |
- padding_duration = signal_0_duration - signal_1_duration |
- if padding_duration > 0: # That is signal_1_duration < signal_0_duration. |
+ signal_duration = len(signal) |
+ noise_duration = len(noise) |
+ logging.warning('mix signals with padding') |
+ logging.warning(' signal: %d ms', signal_duration) |
+ logging.warning(' noise: %d ms', noise_duration) |
+ padding_duration = noise_duration - signal_duration |
+ if padding_duration > 0: # That is signal_duration < noise_duration. |
logging.debug(' padding: %d ms', padding_duration) |
padding = pydub.AudioSegment.silent( |
duration=padding_duration, |
- frame_rate=signal_0.frame_rate) |
- logging.debug(' signal_1 (pre): %d ms', len(signal_1)) |
- signal_1 = signal_1 + padding |
- logging.debug(' signal_1 (post): %d ms', len(signal_1)) |
+ frame_rate=signal.frame_rate) |
+ logging.debug(' signal (pre): %d ms', len(signal)) |
+ signal = signal + padding |
+ logging.debug(' signal (post): %d ms', len(signal)) |
+ |
+ # Update power. |
+ signal_power = float(signal.dBFS) |
# Mix signals using the target SNR. |
- power_0 = float(signal_0.dBFS) |
- power_1 = float(signal_1.dBFS) |
- gain_db = target_snr + power_1 - power_0 |
- return cls.normalize(signal_1.overlay(signal_0.apply_gain(gain_db))) |
+ gain_db = signal_power - noise_power - target_snr |
+ return cls.normalize(signal.overlay(noise.apply_gain(gain_db))) |