From 2ca05e3f38262fbaa14f05dc4325fdb5ae6c9abf Mon Sep 17 00:00:00 2001 From: Alexandre <44178713+alexbelgium@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:00:17 +0200 Subject: [PATCH] Update DOCS.md --- birdnet-pi/DOCS.md | 327 +++++++++++++++++++++++++++++++-------------- 1 file changed, 230 insertions(+), 97 deletions(-) diff --git a/birdnet-pi/DOCS.md b/birdnet-pi/DOCS.md index 24fa74906..f65f9b771 100644 --- a/birdnet-pi/DOCS.md +++ b/birdnet-pi/DOCS.md @@ -391,11 +391,12 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py" ```python #!/usr/bin/env python3 """ -Microphone Gain Adjustment Script +Microphone Gain Adjustment Script with THD and Overload Detection This script captures audio from an RTSP stream, processes it to calculate the RMS -within the 2000-4000 Hz frequency band, detects clipping, and adjusts the microphone -gain based on predefined noise thresholds, trends, and clipping detection. +within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic +Distortion (THD), and adjusts the microphone gain based on predefined noise thresholds, +trends, and distortion metrics. Dependencies: - numpy @@ -404,12 +405,12 @@ Dependencies: - amixer (for microphone gain control) Author: OpenAI ChatGPT -Date: 2024-04-27 +Date: 2024-04-27 (Updated) """ import subprocess import numpy as np -from scipy.signal import butter, sosfilt +from scipy.signal import butter, sosfilt, find_peaks import time import re @@ -424,11 +425,11 @@ INCREASE_GAIN_STEP_DB = 5 # Gain increase step in dB CLIPPING_REDUCTION_DB = 3 # Reduction in dB if clipping is detected # Noise Thresholds -NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude -NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude +NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude +NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude # Trend Detection -TREND_COUNT_THRESHOLD = 1 # Number of consecutive trends needed to adjust gain +TREND_COUNT_THRESHOLD = 3 # Number of consecutive trends needed to adjust gain # RTSP Stream URL RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP stream URL @@ -436,10 +437,21 @@ RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP strea # Debug Mode (1 for enabled, 0 for disabled) DEBUG = 1 +# Microphone Characteristics +MIC_SENSITIVITY_DB = -28 # dB (0 dB = 1V/Pa) +MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz + +# Calibration Constants (These may need to be adjusted based on actual calibration) +REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL + +# THD Settings +THD_FUNDAMENTAL_THRESHOLD_DB = 60 # Minimum SPL to consider THD calculation +MAX_THD_PERCENTAGE = 5.0 # Maximum acceptable THD percentage + # ----------------------------------------------------------------------- -def debug(msg): +def debug_print(msg): """ Prints debug messages if DEBUG mode is enabled. @@ -464,13 +476,13 @@ def get_gain_db(mic_name): match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output) if match: gain_db = float(match.group(1)) - debug(f"Retrieved gain: {gain_db} dB") + debug_print(f"Retrieved gain: {gain_db} dB") return gain_db else: - debug("No gain information found in amixer output.") + debug_print("No gain information found in amixer output.") return None except subprocess.CalledProcessError as e: - debug(f"amixer sget failed: {e}") + debug_print(f"amixer sget failed: {e}") return None @@ -485,36 +497,143 @@ def set_gain_db(mic_name, gain_db): cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB'] try: subprocess.check_call(cmd, stderr=subprocess.STDOUT) - debug(f"Set gain to: {gain_db} dB") + debug_print(f"Set gain to: {gain_db} dB") return True except subprocess.CalledProcessError as e: - debug(f"amixer sset failed: {e}") + debug_print(f"amixer sset failed: {e}") return False -def detect_clipping(audio): +def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000): """ - Detects if clipping occurs in the audio signal. + Dynamically finds the fundamental frequency within a specified range. + + :param fft_freqs: Array of frequency bins from FFT. + :param fft_magnitude: Magnitude spectrum from FFT. + :param min_freq: Minimum frequency to search for the fundamental. + :param max_freq: Maximum frequency to search for the fundamental. + :return: Fundamental frequency in Hz and its amplitude. + """ + # Limit search to the specified frequency range + idx_min = np.searchsorted(fft_freqs, min_freq) + idx_max = np.searchsorted(fft_freqs, max_freq) + if idx_max <= idx_min: + return None, 0 + + search_magnitude = fft_magnitude[idx_min:idx_max] + search_freqs = fft_freqs[idx_min:idx_max] + + # Find peaks in the magnitude spectrum + peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1) + if len(peaks) == 0: + return None, 0 + + # Identify the peak with the highest magnitude + peak_heights = properties['peak_heights'] + max_peak_idx = np.argmax(peak_heights) + fundamental_freq = search_freqs[peaks[max_peak_idx]] + fundamental_amplitude = search_magnitude[peaks[max_peak_idx]] + + debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}") + return fundamental_freq, fundamental_amplitude + + +def thd_calculation(audio, sampling_rate, num_harmonics=5): + """ + Calculates Total Harmonic Distortion (THD) for the audio signal. :param audio: The audio signal as a numpy array. - :return: True if clipping is detected, False otherwise. + :param sampling_rate: Sampling rate of the audio signal. + :param num_harmonics: Number of harmonics to include in THD calculation. + :return: THD value in percentage. """ - CLIPPING_THRESHOLD = 1.0 # Normalized PCM16 max value is ±1.0 - if np.any(audio >= CLIPPING_THRESHOLD) or np.any(audio <= -CLIPPING_THRESHOLD): - debug("Clipping detected in audio signal.") + # FFT analysis + fft_vals = np.fft.rfft(audio) + fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate) + fft_magnitude = np.abs(fft_vals) + + # Dynamically find the fundamental frequency + fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude) + + if fundamental_freq is None or fundamental_amplitude < 1e-6: + debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.") + return 0.0 + + # Calculate harmonic amplitudes + harmonic_amplitudes = [] + for n in range(2, num_harmonics + 1): + harmonic_freq = n * fundamental_freq + if harmonic_freq > sampling_rate / 2: + break # Skip harmonics beyond Nyquist frequency + + # Find the closest frequency bin + harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq)) + harmonic_amp = fft_magnitude[harmonic_idx] + harmonic_amplitudes.append(harmonic_amp) + debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}") + + # Calculate THD + harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes))) + if fundamental_amplitude == 0: + thd = 0.0 + else: + thd = (harmonic_sum / fundamental_amplitude) * 100 # THD in percentage + + debug_print(f"THD Calculation: {thd:.2f}%") + return thd + + +def calculate_spl(audio, mic_sensitivity_db): + """ + Calculates the Sound Pressure Level (SPL) from the audio signal. + + :param audio: The audio signal as a numpy array. + :param mic_sensitivity_db: Microphone sensitivity in dB (0 dB = 1V/Pa). + :return: SPL in dB. + """ + # Calculate RMS amplitude + rms_amplitude = np.sqrt(np.mean(audio ** 2)) + if rms_amplitude == 0: + debug_print("RMS amplitude is zero. SPL cannot be calculated.") + return -np.inf + + # Convert RMS amplitude to voltage + # Assuming audio is normalized between -1 and 1, representing the actual voltage would require calibration + # For demonstration, we'll proceed with the given sensitivity + + # Convert voltage to pressure (Pa) + mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa + pressure = rms_amplitude / mic_sensitivity_linear # Pa + + # Calculate SPL + spl = 20 * np.log10(pressure / REFERENCE_PRESSURE) + debug_print(f"Calculated SPL: {spl:.2f} dB") + return spl + + +def detect_microphone_overload(spl, mic_clipping_spl): + """ + Detects if the calculated SPL is approaching the microphone's clipping SPL. + + :param spl: The calculated SPL. + :param mic_clipping_spl: The microphone's clipping SPL. + :return: True if overload is detected, False otherwise. + """ + if spl >= mic_clipping_spl - 3: # Consider overload if within 3 dB of clipping SPL + debug_print("Microphone overload detected.") return True return False -def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5): +def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5): """ - Captures audio from an RTSP stream, applies a bandpass filter, divides the - audio into segments, and calculates the RMS of the quietest segment. Also detects clipping. + Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload. :param rtsp_url: The RTSP stream URL. :param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections). + :param sampling_rate: Sampling rate of the audio signal. :param num_bins: Number of segments to divide the audio into. - :return: Tuple containing the RMS amplitude of the quietest segment and a boolean indicating clipping. + :return: Tuple containing the RMS amplitude, THD percentage, SPL value, and overload status. """ cmd = [ 'ffmpeg', @@ -524,67 +643,56 @@ def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5): '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', - '-ar', '32000', + '-ar', str(sampling_rate), '-ac', '1', '-t', '5', '-' ] try: - debug(f"Starting audio capture from {rtsp_url}") + debug_print(f"Starting audio capture from {rtsp_url}") process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if process.returncode != 0: - debug(f"ffmpeg failed with error: {stderr.decode()}") - return None, False + debug_print(f"ffmpeg failed with error: {stderr.decode()}") + return None, None, None, False # Convert raw PCM data to numpy array audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 - debug(f"Captured {len(audio)} samples from audio stream.") + debug_print(f"Captured {len(audio)} samples from audio stream.") if len(audio) == 0: - debug("No audio data captured.") - return None, False - - # Check for clipping - is_clipping = detect_clipping(audio) + debug_print("No audio data captured.") + return None, None, None, False # Apply bandpass filter - filtered = sosfilt(bandpass_sos, audio) - debug("Applied bandpass filter to audio data.") + filtered_audio = sosfilt(bandpass_sos, audio) + debug_print("Applied bandpass filter to audio data.") - # Divide into num_bins - total_samples = len(filtered) - bin_size = total_samples // num_bins + # Calculate RMS + rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) - if bin_size == 0: - debug("Bin size is 0; insufficient audio data.") - return 0.0, is_clipping + # Calculate THD + thd_percentage = thd_calculation(filtered_audio, sampling_rate) - trimmed_length = bin_size * num_bins - trimmed_filtered = filtered[:trimmed_length] - segments = trimmed_filtered.reshape(num_bins, bin_size) - debug(f"Divided audio into {num_bins} bins of {bin_size} samples each.") + # Calculate SPL + spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) - # Calculate RMS for each segment - rms_values = np.sqrt(np.mean(segments ** 2, axis=1)) - debug(f"Calculated RMS values for each segment: {rms_values}") + # Detect microphone overload + overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) - # Return the minimum RMS value and clipping status - min_rms = rms_values.min() - debug(f"Minimum RMS value among segments: {min_rms}") - - return min_rms, is_clipping + return rms_amplitude, thd_percentage, spl, overload except Exception as e: - debug(f"Exception during noise RMS calculation: {e}") - return None, False + debug_print(f"Exception during audio processing: {e}") + return None, None, None, False def main(): """ - Main loop that continuously monitors background noise, detects clipping, and adjusts microphone gain. + Main loop that continuously monitors background noise, detects clipping, calculates THD, + and adjusts microphone gain accordingly. """ TREND_COUNT = 0 PREVIOUS_TREND = 0 @@ -592,10 +700,11 @@ def main(): # Precompute the bandpass filter coefficients LOWCUT = 2000 # Lower frequency bound in Hz HIGHCUT = 8000 # Upper frequency bound in Hz - FILTER_ORDER = 5 # Order of the Butterworth filter + FILTER_ORDER = 5 # Order of the Butterworth filter + SAMPLING_RATE = 32000 # Sampling rate in Hz - sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=44100, output='sos') - debug("Precomputed Butterworth bandpass filter coefficients.") + sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos') + debug_print("Precomputed Butterworth bandpass filter coefficients.") # Set the microphone gain to the maximum gain at the start success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB) @@ -606,48 +715,63 @@ def main(): return while True: - min_rms, is_clipping = calculate_noise_rms(RTSP_URL, sos, num_bins=5) + rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE) - if min_rms is None: + if rms is None: print("Failed to compute noise RMS. Retrying in 1 minute...") time.sleep(60) continue - if not isinstance(min_rms, (float, int)): - print(f"Invalid noise RMS output detected: {min_rms}. Retrying in 1 minute...") - time.sleep(60) - continue - - # Print the final converted RMS amplitude (only once) - print(f"Converted RMS Amplitude: {min_rms}") - debug(f"Current background noise (RMS amplitude): {min_rms}") + # Print the final converted RMS amplitude + print(f"Converted RMS Amplitude: {rms:.6f}") + debug_print(f"Current background noise (RMS amplitude): {rms:.6f}") # Detect clipping and reduce gain if needed - CURRENT_GAIN_DB = get_gain_db(MICROPHONE_NAME) - - if is_clipping: - NEW_GAIN_DB = CURRENT_GAIN_DB - CLIPPING_REDUCTION_DB - if NEW_GAIN_DB < MIN_GAIN_DB: - NEW_GAIN_DB = MIN_GAIN_DB - success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) - if success: - print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") - debug(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.") - else: - print("Failed to reduce gain due to clipping.") + if overload: + current_gain_db = get_gain_db(MICROPHONE_NAME) + if current_gain_db is not None: + NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB + if NEW_GAIN_DB < MIN_GAIN_DB: + NEW_GAIN_DB = MIN_GAIN_DB + success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) + if success: + print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.") + else: + print("Failed to reduce gain due to clipping.") # Skip trend adjustment in case of clipping time.sleep(60) continue + # Handle THD if SPL is above a reasonable threshold + if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: + if thd > MAX_THD_PERCENTAGE: + debug_print(f"High THD detected: {thd:.2f}%") + current_gain_db = get_gain_db(MICROPHONE_NAME) + if current_gain_db is not None: + NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB + if NEW_GAIN_DB < MIN_GAIN_DB: + NEW_GAIN_DB = MIN_GAIN_DB + success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) + if success: + print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.") + else: + print("Failed to adjust gain based on THD.") + else: + debug_print("THD within acceptable limits.") + else: + debug_print("SPL below THD calculation threshold. Skipping THD check.") + # Determine the noise trend - if min_rms > NOISE_THRESHOLD_HIGH: + if rms > NOISE_THRESHOLD_HIGH: CURRENT_TREND = 1 - elif min_rms < NOISE_THRESHOLD_LOW: + elif rms < NOISE_THRESHOLD_LOW: CURRENT_TREND = -1 else: CURRENT_TREND = 0 - debug(f"Current trend: {CURRENT_TREND}") + debug_print(f"Current trend: {CURRENT_TREND}") if CURRENT_TREND != 0: if CURRENT_TREND == PREVIOUS_TREND: @@ -658,34 +782,43 @@ def main(): else: TREND_COUNT = 0 - debug(f"Trend count: {TREND_COUNT}") + debug_print(f"Trend count: {TREND_COUNT}") + + current_gain_db = get_gain_db(MICROPHONE_NAME) + + if current_gain_db is None: + print("Failed to get current gain level. Retrying in 1 minute...") + time.sleep(60) + continue + + debug_print(f"Current gain: {current_gain_db} dB") if TREND_COUNT >= TREND_COUNT_THRESHOLD: if CURRENT_TREND == 1: - # Decrease gain by 1 dB - NEW_GAIN_DB = CURRENT_GAIN_DB - DECREASE_GAIN_STEP_DB + # Decrease gain by DECREASE_GAIN_STEP_DB dB + NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB if NEW_GAIN_DB < MIN_GAIN_DB: NEW_GAIN_DB = MIN_GAIN_DB success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) if success: - print(f"Decreased gain to {NEW_GAIN_DB} dB") - debug(f"Gain adjusted to {NEW_GAIN_DB} dB") + print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.") else: - print("Failed to set new gain.") + print("Failed to decrease gain.") elif CURRENT_TREND == -1: - # Increase gain by 5 dB - NEW_GAIN_DB = CURRENT_GAIN_DB + INCREASE_GAIN_STEP_DB + # Increase gain by INCREASE_GAIN_STEP_DB dB + NEW_GAIN_DB = current_gain_db + INCREASE_GAIN_STEP_DB if NEW_GAIN_DB > MAX_GAIN_DB: NEW_GAIN_DB = MAX_GAIN_DB success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) if success: - print(f"Increased gain to {NEW_GAIN_DB} dB") - debug(f"Gain adjusted to {NEW_GAIN_DB} dB") + print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.") else: - print("Failed to set new gain.") + print("Failed to increase gain.") TREND_COUNT = 0 else: - debug("No gain adjustment needed.") + debug_print("No gain adjustment needed based on noise trend.") # Sleep for 1 minute before the next iteration time.sleep(60)