Update DOCS.md

This commit is contained in:
Alexandre
2024-10-27 16:57:09 +01:00
committed by GitHub
parent bd3e0922c1
commit 06c2811da0

View File

@@ -430,13 +430,13 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
```python
#!/usr/bin/env python3
"""
Microphone Gain Adjustment Script with THD and Overload Detection
Microphone Gain Adjustment Script with Clipping and Overload Detection
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic
Distortion (THD), and adjusts the microphone gain based on predefined noise thresholds,
trends, and distortion metrics.
within the 2000-8000 Hz frequency band, detects clipping, calculates Sound Pressure Level (SPL),
and adjusts the microphone gain based on predefined noise thresholds, trends, and overload metrics.
Dependencies:
- numpy
@@ -444,13 +444,29 @@ Dependencies:
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-04-27 (Updated)
Author: alexbelgium
Date: 27-Oct-2024
Changelog:
-----------
2024-04-27: Initial version
- Implemented basic microphone gain adjustment based on RMS levels and Total Harmonic Distortion (THD) calculations.
- Introduced overload detection based on Sound Pressure Level (SPL).
2024-10-27: Updated for simplified noise and clipping detection
- Removed THD calculations, as natural bird harmonics affect the distortion metric.
- Introduced direct clipping detection by analyzing audio sample amplitudes.
- Refocused the gain adjustment criteria on RMS amplitude and SPL within the target band (2000-8000 Hz).
- Simplified main loop to focus on RMS, SPL, and clipping instead of THD.
- Added `detect_clipping` function to identify clipping events.
- Updated debug logging to enhance traceability and include SPL measurements.
- Adjusted trend detection logic for more responsive gain adjustment.
"""
import subprocess
import numpy as np
from scipy.signal import butter, sosfilt, find_peaks
from scipy.signal import butter, sosfilt
import time
import re
@@ -484,10 +500,6 @@ MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz
# Calibration Constants (These may need to be adjusted based on actual calibration)
REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL
# THD Settings
THD_FUNDAMENTAL_THRESHOLD_DB = 60 # Minimum SPL to consider THD calculation
MAX_THD_PERCENTAGE = 5.0 # Maximum acceptable THD percentage
# -----------------------------------------------------------------------
@@ -544,85 +556,6 @@ def set_gain_db(mic_name, gain_db):
return False
def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=1000, max_freq=8000):
"""
Dynamically finds the fundamental frequency within a specified range.
:param fft_freqs: Array of frequency bins from FFT.
:param fft_magnitude: Magnitude spectrum from FFT.
:param min_freq: Minimum frequency to search for the fundamental.
:param max_freq: Maximum frequency to search for the fundamental.
:return: Fundamental frequency in Hz and its amplitude.
"""
# Limit search to the specified frequency range
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0
search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
# Find peaks in the magnitude spectrum
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0
# Identify the peak with the highest magnitude
peak_heights = properties['peak_heights']
max_peak_idx = np.argmax(peak_heights)
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]
debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}")
return fundamental_freq, fundamental_amplitude
def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
:param audio: The audio signal as a numpy array.
:param sampling_rate: Sampling rate of the audio signal.
:param num_harmonics: Number of harmonics to include in THD calculation.
:return: THD value in percentage.
"""
# FFT analysis
fft_vals = np.fft.rfft(audio)
fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
# Dynamically find the fundamental frequency
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)
if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.")
return 0.0
# Calculate harmonic amplitudes
harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break # Skip harmonics beyond Nyquist frequency
# Find the closest frequency bin
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}")
# Calculate THD
harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
if fundamental_amplitude == 0:
thd = 0.0
else:
thd = (harmonic_sum / fundamental_amplitude) * 100 # THD in percentage
debug_print(f"THD Calculation: {thd:.2f}%")
return thd
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
@@ -638,9 +571,7 @@ def calculate_spl(audio, mic_sensitivity_db):
return -np.inf
# Convert RMS amplitude to voltage
# Assuming audio is normalized between -1 and 1, representing the actual voltage would require calibration
# For demonstration, we'll proceed with the given sensitivity
# Assuming audio is normalized between -1 and 1
# Convert voltage to pressure (Pa)
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa
pressure = rms_amplitude / mic_sensitivity_linear # Pa
@@ -665,15 +596,28 @@ def detect_microphone_overload(spl, mic_clipping_spl):
return False
def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5):
def detect_clipping(audio):
"""
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload.
Detects if clipping has occurred in the audio signal.
:param audio: The audio signal as a numpy array.
:return: True if clipping is detected, False otherwise.
"""
max_amplitude = np.max(np.abs(audio))
if max_amplitude >= 1.0:
debug_print("Clipping detected in audio signal.")
return True
return False
def calculate_noise_rms_and_spl(rtsp_url, bandpass_sos, sampling_rate):
"""
Captures audio from an RTSP stream, calculates RMS, SPL, and detects microphone overload.
:param rtsp_url: The RTSP stream URL.
:param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections).
:param sampling_rate: Sampling rate of the audio signal.
:param num_bins: Number of segments to divide the audio into.
:return: Tuple containing the RMS amplitude, THD percentage, SPL value, and overload status.
:return: Tuple containing the RMS amplitude, SPL value, overload status, and clipping status.
"""
cmd = [
'ffmpeg',
@@ -696,7 +640,7 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=
if process.returncode != 0:
debug_print(f"ffmpeg failed with error: {stderr.decode()}")
return None, None, None, False
return None, None, False, False
# Convert raw PCM data to numpy array
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
@@ -704,7 +648,10 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=
if len(audio) == 0:
debug_print("No audio data captured.")
return None, None, None, False
return None, None, False, False
# Detect clipping
clipping = detect_clipping(audio)
# Apply bandpass filter
filtered_audio = sosfilt(bandpass_sos, audio)
@@ -713,25 +660,22 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=
# Calculate RMS
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
# Calculate THD
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
# Calculate SPL
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
# Detect microphone overload
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
return rms_amplitude, thd_percentage, spl, overload
return rms_amplitude, spl, overload, clipping
except Exception as e:
debug_print(f"Exception during audio processing: {e}")
return None, None, None, False
return None, None, False, False
def main():
"""
Main loop that continuously monitors background noise, detects clipping, calculates THD,
Main loop that continuously monitors background noise, detects clipping,
and adjusts microphone gain accordingly.
"""
TREND_COUNT = 0
@@ -755,19 +699,20 @@ def main():
return
while True:
rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE)
rms, spl, overload, clipping = calculate_noise_rms_and_spl(RTSP_URL, sos, SAMPLING_RATE)
if rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
time.sleep(60)
continue
# Print the final converted RMS amplitude
print(f"Converted RMS Amplitude: {rms:.6f}")
# Print the final RMS amplitude
print(f"RMS Amplitude: {rms:.6f}")
debug_print(f"Current background noise (RMS amplitude): {rms:.6f}")
debug_print(f"Calculated SPL: {spl:.2f} dB")
# Detect clipping and reduce gain if needed
if overload:
if overload or clipping:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB
@@ -775,34 +720,14 @@ def main():
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.")
print(f"Overload or clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to overload or clipping.")
else:
print("Failed to reduce gain due to clipping.")
# Skip trend adjustment in case of clipping
print("Failed to reduce gain due to overload or clipping.")
# Skip trend adjustment in case of overload or clipping
time.sleep(60)
continue
# Handle THD if SPL is above a reasonable threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
if thd > MAX_THD_PERCENTAGE:
debug_print(f"High THD detected: {thd:.2f}%")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.")
else:
print("Failed to adjust gain based on THD.")
else:
debug_print("THD within acceptable limits.")
else:
debug_print("SPL below THD calculation threshold. Skipping THD check.")
# Determine the noise trend
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1