From 06c2811da09bf74dc61f87793dce8f31854b7339 Mon Sep 17 00:00:00 2001 From: Alexandre <44178713+alexbelgium@users.noreply.github.com> Date: Sun, 27 Oct 2024 16:57:09 +0100 Subject: [PATCH] Update DOCS.md --- birdnet-pi/DOCS.md | 191 ++++++++++++++------------------------------- 1 file changed, 58 insertions(+), 133 deletions(-) diff --git a/birdnet-pi/DOCS.md b/birdnet-pi/DOCS.md index f70934e14..8b7649610 100644 --- a/birdnet-pi/DOCS.md +++ b/birdnet-pi/DOCS.md @@ -430,13 +430,13 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py" ```python #!/usr/bin/env python3 + """ -Microphone Gain Adjustment Script with THD and Overload Detection +Microphone Gain Adjustment Script with Clipping and Overload Detection This script captures audio from an RTSP stream, processes it to calculate the RMS -within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic -Distortion (THD), and adjusts the microphone gain based on predefined noise thresholds, -trends, and distortion metrics. +within the 2000-8000 Hz frequency band, detects clipping, calculates Sound Pressure Level (SPL), +and adjusts the microphone gain based on predefined noise thresholds, trends, and overload metrics. Dependencies: - numpy @@ -444,13 +444,29 @@ Dependencies: - ffmpeg (installed and accessible in PATH) - amixer (for microphone gain control) -Author: OpenAI ChatGPT -Date: 2024-04-27 (Updated) +Author: alexbelgium +Date: 27-Oct-2024 + +Changelog: +----------- +2024-04-27: Initial version +- Implemented basic microphone gain adjustment based on RMS levels and Total Harmonic Distortion (THD) calculations. +- Introduced overload detection based on Sound Pressure Level (SPL). + +2024-10-27: Updated for simplified noise and clipping detection +- Removed THD calculations, as natural bird harmonics affect the distortion metric. +- Introduced direct clipping detection by analyzing audio sample amplitudes. +- Refocused the gain adjustment criteria on RMS amplitude and SPL within the target band (2000-8000 Hz). +- Simplified main loop to focus on RMS, SPL, and clipping instead of THD. +- Added `detect_clipping` function to identify clipping events. +- Updated debug logging to enhance traceability and include SPL measurements. +- Adjusted trend detection logic for more responsive gain adjustment. + """ import subprocess import numpy as np -from scipy.signal import butter, sosfilt, find_peaks +from scipy.signal import butter, sosfilt import time import re @@ -484,10 +500,6 @@ MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz # Calibration Constants (These may need to be adjusted based on actual calibration) REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL -# THD Settings -THD_FUNDAMENTAL_THRESHOLD_DB = 60 # Minimum SPL to consider THD calculation -MAX_THD_PERCENTAGE = 5.0 # Maximum acceptable THD percentage - # ----------------------------------------------------------------------- @@ -544,85 +556,6 @@ def set_gain_db(mic_name, gain_db): return False -def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=1000, max_freq=8000): - """ - Dynamically finds the fundamental frequency within a specified range. - - :param fft_freqs: Array of frequency bins from FFT. - :param fft_magnitude: Magnitude spectrum from FFT. - :param min_freq: Minimum frequency to search for the fundamental. - :param max_freq: Maximum frequency to search for the fundamental. - :return: Fundamental frequency in Hz and its amplitude. - """ - # Limit search to the specified frequency range - idx_min = np.searchsorted(fft_freqs, min_freq) - idx_max = np.searchsorted(fft_freqs, max_freq) - if idx_max <= idx_min: - return None, 0 - - search_magnitude = fft_magnitude[idx_min:idx_max] - search_freqs = fft_freqs[idx_min:idx_max] - - # Find peaks in the magnitude spectrum - peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1) - if len(peaks) == 0: - return None, 0 - - # Identify the peak with the highest magnitude - peak_heights = properties['peak_heights'] - max_peak_idx = np.argmax(peak_heights) - fundamental_freq = search_freqs[peaks[max_peak_idx]] - fundamental_amplitude = search_magnitude[peaks[max_peak_idx]] - - debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}") - return fundamental_freq, fundamental_amplitude - - -def thd_calculation(audio, sampling_rate, num_harmonics=5): - """ - Calculates Total Harmonic Distortion (THD) for the audio signal. - - :param audio: The audio signal as a numpy array. - :param sampling_rate: Sampling rate of the audio signal. - :param num_harmonics: Number of harmonics to include in THD calculation. - :return: THD value in percentage. - """ - # FFT analysis - fft_vals = np.fft.rfft(audio) - fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate) - fft_magnitude = np.abs(fft_vals) - - # Dynamically find the fundamental frequency - fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude) - - if fundamental_freq is None or fundamental_amplitude < 1e-6: - debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.") - return 0.0 - - # Calculate harmonic amplitudes - harmonic_amplitudes = [] - for n in range(2, num_harmonics + 1): - harmonic_freq = n * fundamental_freq - if harmonic_freq > sampling_rate / 2: - break # Skip harmonics beyond Nyquist frequency - - # Find the closest frequency bin - harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq)) - harmonic_amp = fft_magnitude[harmonic_idx] - harmonic_amplitudes.append(harmonic_amp) - debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}") - - # Calculate THD - harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes))) - if fundamental_amplitude == 0: - thd = 0.0 - else: - thd = (harmonic_sum / fundamental_amplitude) * 100 # THD in percentage - - debug_print(f"THD Calculation: {thd:.2f}%") - return thd - - def calculate_spl(audio, mic_sensitivity_db): """ Calculates the Sound Pressure Level (SPL) from the audio signal. @@ -638,9 +571,7 @@ def calculate_spl(audio, mic_sensitivity_db): return -np.inf # Convert RMS amplitude to voltage - # Assuming audio is normalized between -1 and 1, representing the actual voltage would require calibration - # For demonstration, we'll proceed with the given sensitivity - + # Assuming audio is normalized between -1 and 1 # Convert voltage to pressure (Pa) mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa pressure = rms_amplitude / mic_sensitivity_linear # Pa @@ -665,15 +596,28 @@ def detect_microphone_overload(spl, mic_clipping_spl): return False -def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5): +def detect_clipping(audio): """ - Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload. + Detects if clipping has occurred in the audio signal. + + :param audio: The audio signal as a numpy array. + :return: True if clipping is detected, False otherwise. + """ + max_amplitude = np.max(np.abs(audio)) + if max_amplitude >= 1.0: + debug_print("Clipping detected in audio signal.") + return True + return False + + +def calculate_noise_rms_and_spl(rtsp_url, bandpass_sos, sampling_rate): + """ + Captures audio from an RTSP stream, calculates RMS, SPL, and detects microphone overload. :param rtsp_url: The RTSP stream URL. :param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections). :param sampling_rate: Sampling rate of the audio signal. - :param num_bins: Number of segments to divide the audio into. - :return: Tuple containing the RMS amplitude, THD percentage, SPL value, and overload status. + :return: Tuple containing the RMS amplitude, SPL value, overload status, and clipping status. """ cmd = [ 'ffmpeg', @@ -696,7 +640,7 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins= if process.returncode != 0: debug_print(f"ffmpeg failed with error: {stderr.decode()}") - return None, None, None, False + return None, None, False, False # Convert raw PCM data to numpy array audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 @@ -704,7 +648,10 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins= if len(audio) == 0: debug_print("No audio data captured.") - return None, None, None, False + return None, None, False, False + + # Detect clipping + clipping = detect_clipping(audio) # Apply bandpass filter filtered_audio = sosfilt(bandpass_sos, audio) @@ -713,25 +660,22 @@ def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins= # Calculate RMS rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) - # Calculate THD - thd_percentage = thd_calculation(filtered_audio, sampling_rate) - # Calculate SPL spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) # Detect microphone overload overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) - return rms_amplitude, thd_percentage, spl, overload + return rms_amplitude, spl, overload, clipping except Exception as e: debug_print(f"Exception during audio processing: {e}") - return None, None, None, False + return None, None, False, False def main(): """ - Main loop that continuously monitors background noise, detects clipping, calculates THD, + Main loop that continuously monitors background noise, detects clipping, and adjusts microphone gain accordingly. """ TREND_COUNT = 0 @@ -755,19 +699,20 @@ def main(): return while True: - rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE) + rms, spl, overload, clipping = calculate_noise_rms_and_spl(RTSP_URL, sos, SAMPLING_RATE) if rms is None: print("Failed to compute noise RMS. Retrying in 1 minute...") time.sleep(60) continue - # Print the final converted RMS amplitude - print(f"Converted RMS Amplitude: {rms:.6f}") + # Print the final RMS amplitude + print(f"RMS Amplitude: {rms:.6f}") debug_print(f"Current background noise (RMS amplitude): {rms:.6f}") + debug_print(f"Calculated SPL: {spl:.2f} dB") # Detect clipping and reduce gain if needed - if overload: + if overload or clipping: current_gain_db = get_gain_db(MICROPHONE_NAME) if current_gain_db is not None: NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB @@ -775,34 +720,14 @@ def main(): NEW_GAIN_DB = MIN_GAIN_DB success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) if success: - print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.") + print(f"Overload or clipping detected. Reduced gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to overload or clipping.") else: - print("Failed to reduce gain due to clipping.") - # Skip trend adjustment in case of clipping + print("Failed to reduce gain due to overload or clipping.") + # Skip trend adjustment in case of overload or clipping time.sleep(60) continue - # Handle THD if SPL is above a reasonable threshold - if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: - if thd > MAX_THD_PERCENTAGE: - debug_print(f"High THD detected: {thd:.2f}%") - current_gain_db = get_gain_db(MICROPHONE_NAME) - if current_gain_db is not None: - NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB - if NEW_GAIN_DB < MIN_GAIN_DB: - NEW_GAIN_DB = MIN_GAIN_DB - success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) - if success: - print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.") - else: - print("Failed to adjust gain based on THD.") - else: - debug_print("THD within acceptable limits.") - else: - debug_print("SPL below THD calculation threshold. Skipping THD check.") - # Determine the noise trend if rms > NOISE_THRESHOLD_HIGH: CURRENT_TREND = 1