Update DOCS.md

This commit is contained in:
Alexandre
2024-10-18 12:00:17 +02:00
committed by GitHub
parent 2876fe9f50
commit 2ca05e3f38

View File

@@ -391,11 +391,12 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
```python
#!/usr/bin/env python3
"""
Microphone Gain Adjustment Script
Microphone Gain Adjustment Script with THD and Overload Detection
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-4000 Hz frequency band, detects clipping, and adjusts the microphone
gain based on predefined noise thresholds, trends, and clipping detection.
within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic
Distortion (THD), and adjusts the microphone gain based on predefined noise thresholds,
trends, and distortion metrics.
Dependencies:
- numpy
@@ -404,12 +405,12 @@ Dependencies:
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-04-27
Date: 2024-04-27 (Updated)
"""
import subprocess
import numpy as np
from scipy.signal import butter, sosfilt
from scipy.signal import butter, sosfilt, find_peaks
import time
import re
@@ -424,11 +425,11 @@ INCREASE_GAIN_STEP_DB = 5 # Gain increase step in dB
CLIPPING_REDUCTION_DB = 3 # Reduction in dB if clipping is detected
# Noise Thresholds
NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude
NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude
NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude
NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude
# Trend Detection
TREND_COUNT_THRESHOLD = 1 # Number of consecutive trends needed to adjust gain
TREND_COUNT_THRESHOLD = 3 # Number of consecutive trends needed to adjust gain
# RTSP Stream URL
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP stream URL
@@ -436,10 +437,21 @@ RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP strea
# Debug Mode (1 for enabled, 0 for disabled)
DEBUG = 1
# Microphone Characteristics
MIC_SENSITIVITY_DB = -28 # dB (0 dB = 1V/Pa)
MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz
# Calibration Constants (These may need to be adjusted based on actual calibration)
REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL
# THD Settings
THD_FUNDAMENTAL_THRESHOLD_DB = 60 # Minimum SPL to consider THD calculation
MAX_THD_PERCENTAGE = 5.0 # Maximum acceptable THD percentage
# -----------------------------------------------------------------------
def debug(msg):
def debug_print(msg):
"""
Prints debug messages if DEBUG mode is enabled.
@@ -464,13 +476,13 @@ def get_gain_db(mic_name):
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match:
gain_db = float(match.group(1))
debug(f"Retrieved gain: {gain_db} dB")
debug_print(f"Retrieved gain: {gain_db} dB")
return gain_db
else:
debug("No gain information found in amixer output.")
debug_print("No gain information found in amixer output.")
return None
except subprocess.CalledProcessError as e:
debug(f"amixer sget failed: {e}")
debug_print(f"amixer sget failed: {e}")
return None
@@ -485,36 +497,143 @@ def set_gain_db(mic_name, gain_db):
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try:
subprocess.check_call(cmd, stderr=subprocess.STDOUT)
debug(f"Set gain to: {gain_db} dB")
debug_print(f"Set gain to: {gain_db} dB")
return True
except subprocess.CalledProcessError as e:
debug(f"amixer sset failed: {e}")
debug_print(f"amixer sset failed: {e}")
return False
def detect_clipping(audio):
def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000):
"""
Detects if clipping occurs in the audio signal.
Dynamically finds the fundamental frequency within a specified range.
:param fft_freqs: Array of frequency bins from FFT.
:param fft_magnitude: Magnitude spectrum from FFT.
:param min_freq: Minimum frequency to search for the fundamental.
:param max_freq: Maximum frequency to search for the fundamental.
:return: Fundamental frequency in Hz and its amplitude.
"""
# Limit search to the specified frequency range
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0
search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
# Find peaks in the magnitude spectrum
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0
# Identify the peak with the highest magnitude
peak_heights = properties['peak_heights']
max_peak_idx = np.argmax(peak_heights)
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]
debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}")
return fundamental_freq, fundamental_amplitude
def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
:param audio: The audio signal as a numpy array.
:return: True if clipping is detected, False otherwise.
:param sampling_rate: Sampling rate of the audio signal.
:param num_harmonics: Number of harmonics to include in THD calculation.
:return: THD value in percentage.
"""
CLIPPING_THRESHOLD = 1.0 # Normalized PCM16 max value is ±1.0
if np.any(audio >= CLIPPING_THRESHOLD) or np.any(audio <= -CLIPPING_THRESHOLD):
debug("Clipping detected in audio signal.")
# FFT analysis
fft_vals = np.fft.rfft(audio)
fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
# Dynamically find the fundamental frequency
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)
if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.")
return 0.0
# Calculate harmonic amplitudes
harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break # Skip harmonics beyond Nyquist frequency
# Find the closest frequency bin
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}")
# Calculate THD
harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
if fundamental_amplitude == 0:
thd = 0.0
else:
thd = (harmonic_sum / fundamental_amplitude) * 100 # THD in percentage
debug_print(f"THD Calculation: {thd:.2f}%")
return thd
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
:param audio: The audio signal as a numpy array.
:param mic_sensitivity_db: Microphone sensitivity in dB (0 dB = 1V/Pa).
:return: SPL in dB.
"""
# Calculate RMS amplitude
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.")
return -np.inf
# Convert RMS amplitude to voltage
# Assuming audio is normalized between -1 and 1, representing the actual voltage would require calibration
# For demonstration, we'll proceed with the given sensitivity
# Convert voltage to pressure (Pa)
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa
pressure = rms_amplitude / mic_sensitivity_linear # Pa
# Calculate SPL
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB")
return spl
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
:param spl: The calculated SPL.
:param mic_clipping_spl: The microphone's clipping SPL.
:return: True if overload is detected, False otherwise.
"""
if spl >= mic_clipping_spl - 3: # Consider overload if within 3 dB of clipping SPL
debug_print("Microphone overload detected.")
return True
return False
def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5):
def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5):
"""
Captures audio from an RTSP stream, applies a bandpass filter, divides the
audio into segments, and calculates the RMS of the quietest segment. Also detects clipping.
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload.
:param rtsp_url: The RTSP stream URL.
:param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections).
:param sampling_rate: Sampling rate of the audio signal.
:param num_bins: Number of segments to divide the audio into.
:return: Tuple containing the RMS amplitude of the quietest segment and a boolean indicating clipping.
:return: Tuple containing the RMS amplitude, THD percentage, SPL value, and overload status.
"""
cmd = [
'ffmpeg',
@@ -524,67 +643,56 @@ def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5):
'-vn',
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', '32000',
'-ar', str(sampling_rate),
'-ac', '1',
'-t', '5',
'-'
]
try:
debug(f"Starting audio capture from {rtsp_url}")
debug_print(f"Starting audio capture from {rtsp_url}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug(f"ffmpeg failed with error: {stderr.decode()}")
return None, False
debug_print(f"ffmpeg failed with error: {stderr.decode()}")
return None, None, None, False
# Convert raw PCM data to numpy array
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug(f"Captured {len(audio)} samples from audio stream.")
debug_print(f"Captured {len(audio)} samples from audio stream.")
if len(audio) == 0:
debug("No audio data captured.")
return None, False
# Check for clipping
is_clipping = detect_clipping(audio)
debug_print("No audio data captured.")
return None, None, None, False
# Apply bandpass filter
filtered = sosfilt(bandpass_sos, audio)
debug("Applied bandpass filter to audio data.")
filtered_audio = sosfilt(bandpass_sos, audio)
debug_print("Applied bandpass filter to audio data.")
# Divide into num_bins
total_samples = len(filtered)
bin_size = total_samples // num_bins
# Calculate RMS
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
if bin_size == 0:
debug("Bin size is 0; insufficient audio data.")
return 0.0, is_clipping
# Calculate THD
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
trimmed_length = bin_size * num_bins
trimmed_filtered = filtered[:trimmed_length]
segments = trimmed_filtered.reshape(num_bins, bin_size)
debug(f"Divided audio into {num_bins} bins of {bin_size} samples each.")
# Calculate SPL
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
# Calculate RMS for each segment
rms_values = np.sqrt(np.mean(segments ** 2, axis=1))
debug(f"Calculated RMS values for each segment: {rms_values}")
# Detect microphone overload
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
# Return the minimum RMS value and clipping status
min_rms = rms_values.min()
debug(f"Minimum RMS value among segments: {min_rms}")
return min_rms, is_clipping
return rms_amplitude, thd_percentage, spl, overload
except Exception as e:
debug(f"Exception during noise RMS calculation: {e}")
return None, False
debug_print(f"Exception during audio processing: {e}")
return None, None, None, False
def main():
"""
Main loop that continuously monitors background noise, detects clipping, and adjusts microphone gain.
Main loop that continuously monitors background noise, detects clipping, calculates THD,
and adjusts microphone gain accordingly.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
@@ -592,10 +700,11 @@ def main():
# Precompute the bandpass filter coefficients
LOWCUT = 2000 # Lower frequency bound in Hz
HIGHCUT = 8000 # Upper frequency bound in Hz
FILTER_ORDER = 5 # Order of the Butterworth filter
FILTER_ORDER = 5 # Order of the Butterworth filter
SAMPLING_RATE = 32000 # Sampling rate in Hz
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=44100, output='sos')
debug("Precomputed Butterworth bandpass filter coefficients.")
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos')
debug_print("Precomputed Butterworth bandpass filter coefficients.")
# Set the microphone gain to the maximum gain at the start
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
@@ -606,48 +715,63 @@ def main():
return
while True:
min_rms, is_clipping = calculate_noise_rms(RTSP_URL, sos, num_bins=5)
rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE)
if min_rms is None:
if rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
time.sleep(60)
continue
if not isinstance(min_rms, (float, int)):
print(f"Invalid noise RMS output detected: {min_rms}. Retrying in 1 minute...")
time.sleep(60)
continue
# Print the final converted RMS amplitude (only once)
print(f"Converted RMS Amplitude: {min_rms}")
debug(f"Current background noise (RMS amplitude): {min_rms}")
# Print the final converted RMS amplitude
print(f"Converted RMS Amplitude: {rms:.6f}")
debug_print(f"Current background noise (RMS amplitude): {rms:.6f}")
# Detect clipping and reduce gain if needed
CURRENT_GAIN_DB = get_gain_db(MICROPHONE_NAME)
if is_clipping:
NEW_GAIN_DB = CURRENT_GAIN_DB - CLIPPING_REDUCTION_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.")
else:
print("Failed to reduce gain due to clipping.")
if overload:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.")
else:
print("Failed to reduce gain due to clipping.")
# Skip trend adjustment in case of clipping
time.sleep(60)
continue
# Handle THD if SPL is above a reasonable threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
if thd > MAX_THD_PERCENTAGE:
debug_print(f"High THD detected: {thd:.2f}%")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.")
else:
print("Failed to adjust gain based on THD.")
else:
debug_print("THD within acceptable limits.")
else:
debug_print("SPL below THD calculation threshold. Skipping THD check.")
# Determine the noise trend
if min_rms > NOISE_THRESHOLD_HIGH:
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
elif min_rms < NOISE_THRESHOLD_LOW:
elif rms < NOISE_THRESHOLD_LOW:
CURRENT_TREND = -1
else:
CURRENT_TREND = 0
debug(f"Current trend: {CURRENT_TREND}")
debug_print(f"Current trend: {CURRENT_TREND}")
if CURRENT_TREND != 0:
if CURRENT_TREND == PREVIOUS_TREND:
@@ -658,34 +782,43 @@ def main():
else:
TREND_COUNT = 0
debug(f"Trend count: {TREND_COUNT}")
debug_print(f"Trend count: {TREND_COUNT}")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is None:
print("Failed to get current gain level. Retrying in 1 minute...")
time.sleep(60)
continue
debug_print(f"Current gain: {current_gain_db} dB")
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
if CURRENT_TREND == 1:
# Decrease gain by 1 dB
NEW_GAIN_DB = CURRENT_GAIN_DB - DECREASE_GAIN_STEP_DB
# Decrease gain by DECREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Decreased gain to {NEW_GAIN_DB} dB")
debug(f"Gain adjusted to {NEW_GAIN_DB} dB")
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.")
else:
print("Failed to set new gain.")
print("Failed to decrease gain.")
elif CURRENT_TREND == -1:
# Increase gain by 5 dB
NEW_GAIN_DB = CURRENT_GAIN_DB + INCREASE_GAIN_STEP_DB
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = current_gain_db + INCREASE_GAIN_STEP_DB
if NEW_GAIN_DB > MAX_GAIN_DB:
NEW_GAIN_DB = MAX_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Increased gain to {NEW_GAIN_DB} dB")
debug(f"Gain adjusted to {NEW_GAIN_DB} dB")
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.")
else:
print("Failed to set new gain.")
print("Failed to increase gain.")
TREND_COUNT = 0
else:
debug("No gain adjustment needed.")
debug_print("No gain adjustment needed based on noise trend.")
# Sleep for 1 minute before the next iteration
time.sleep(60)