Index: modules/audio_processing/test/py_quality_assessment/quality_assessment/eval_scores.py |
diff --git a/modules/audio_processing/test/py_quality_assessment/quality_assessment/eval_scores.py b/modules/audio_processing/test/py_quality_assessment/quality_assessment/eval_scores.py |
index 78d0c18558fc3459c334f45b248e8979c544b47b..420afd224384841afb2ac479f14a5659573a3071 100644 |
--- a/modules/audio_processing/test/py_quality_assessment/quality_assessment/eval_scores.py |
+++ b/modules/audio_processing/test/py_quality_assessment/quality_assessment/eval_scores.py |
@@ -14,6 +14,13 @@ import logging |
import os |
import re |
import subprocess |
+import sys |
+ |
+try: |
+ import numpy as np |
+except ImportError: |
+ logging.critical('Cannot import the third-party Python package numpy') |
+ sys.exit(1) |
from . import data_access |
from . import exceptions |
@@ -27,6 +34,7 @@ class EvaluationScore(object): |
def __init__(self, score_filename_prefix): |
self._score_filename_prefix = score_filename_prefix |
+ self._input_signal_metadata = None |
self._reference_signal = None |
self._reference_signal_filepath = None |
self._tested_signal = None |
@@ -56,8 +64,16 @@ class EvaluationScore(object): |
def score(self): |
return self._score |
+ def SetInputSignalMetadata(self, metadata): |
+ """Sets input signal metadata. |
+ |
+ Args: |
+ metadata: dict instance. |
+ """ |
+ self._input_signal_metadata = metadata |
+ |
def SetReferenceSignalFilepath(self, filepath): |
- """ Sets the path to the audio track used as reference signal. |
+ """Sets the path to the audio track used as reference signal. |
Args: |
filepath: path to the reference audio track. |
@@ -65,7 +81,7 @@ class EvaluationScore(object): |
self._reference_signal_filepath = filepath |
def SetTestedSignalFilepath(self, filepath): |
- """ Sets the path to the audio track used as test signal. |
+ """Sets the path to the audio track used as test signal. |
Args: |
filepath: path to the test audio track. |
@@ -242,3 +258,84 @@ class PolqaScore(EvaluationScore): |
# Build and return a dictionary with field names (header) as keys and the |
# corresponding field values as values. |
return {data[0][index]: data[1][index] for index in range(number_of_fields)} |
+ |
+ |
+@EvaluationScore.RegisterClass |
+class TotalHarmonicDistorsionScore(EvaluationScore): |
+ """Total harmonic distorsion plus noise score. |
+ |
+ Total harmonic distorsion plus noise score. |
+ See "https://en.wikipedia.org/wiki/Total_harmonic_distortion#THD.2BN". |
+ |
+ Unit: -. |
+ Ideal: 0. |
+ Worst case: +inf |
+ """ |
+ |
+ NAME = 'thd' |
+ |
+ def __init__(self, score_filename_prefix): |
+ EvaluationScore.__init__(self, score_filename_prefix) |
+ self._input_frequency = None |
+ |
+ def _Run(self, output_path): |
+ # TODO(aleloi): Integrate changes made locally. |
+ self._CheckInputSignal() |
+ |
+ self._LoadTestedSignal() |
+ if self._tested_signal.channels != 1: |
+ raise exceptions.EvaluationScoreException( |
+ 'unsupported number of channels') |
+ samples = signal_processing.SignalProcessingUtils.AudioSegmentToRawData( |
+ self._tested_signal) |
+ |
+ # Init. |
+ num_samples = len(samples) |
+ duration = len(self._tested_signal) / 1000.0 |
+ scaling = 2.0 / num_samples |
+ max_freq = self._tested_signal.frame_rate / 2 |
+ f0_freq = float(self._input_frequency) |
+ t = np.linspace(0, duration, num_samples) |
+ |
+ # Analyze harmonics. |
+ b_terms = [] |
+ n = 1 |
+ while f0_freq * n < max_freq: |
+ x_n = np.sum(samples * np.sin(2.0 * np.pi * n * f0_freq * t)) * scaling |
+ y_n = np.sum(samples * np.cos(2.0 * np.pi * n * f0_freq * t)) * scaling |
+ b_terms.append(np.sqrt(x_n**2 + y_n**2)) |
+ n += 1 |
+ |
+ output_without_fundamental = samples - b_terms[0] * np.sin( |
+ 2.0 * np.pi * f0_freq * t) |
+ distortion_and_noise = np.sqrt(np.sum( |
+ output_without_fundamental**2) * np.pi * scaling) |
+ |
+ # TODO(alessiob): Fix or remove if not needed. |
+ # thd = np.sqrt(np.sum(b_terms[1:]**2)) / b_terms[0] |
+ |
+ # TODO(alessiob): Check the range of |thd_plus_noise| and update the class |
+ # docstring above if accordingly. |
+ thd_plus_noise = distortion_and_noise / b_terms[0] |
+ |
+ self._score = thd_plus_noise |
+ self._SaveScore() |
+ |
+ def _CheckInputSignal(self): |
+ # Check input signal and get properties. |
+ try: |
+ if self._input_signal_metadata['signal'] != 'pure_tone': |
+ raise exceptions.EvaluationScoreException( |
+ 'The THD score requires a pure tone as input signal') |
+ self._input_frequency = self._input_signal_metadata['frequency'] |
+ if self._input_signal_metadata['test_data_gen_name'] != 'identity' or ( |
+ self._input_signal_metadata['test_data_gen_config'] != 'default'): |
+ raise exceptions.EvaluationScoreException( |
+ 'The THD score cannot be used with any test data generator other ' |
+ 'than "identity"') |
+ except TypeError: |
+ raise exceptions.EvaluationScoreException( |
+ 'The THD score requires an input signal with associated metadata') |
+ except KeyError: |
+ raise exceptions.EvaluationScoreException( |
+ 'Invalid input signal metadata to compute the THD score') |