diff --git a/birdnet-pi/DOCS.md b/birdnet-pi/DOCS.md index 8b7649610..de5c405b7 100644 --- a/birdnet-pi/DOCS.md +++ b/birdnet-pi/DOCS.md @@ -429,266 +429,41 @@ echo "Mono optimization applied. Only using primary input and balanced outputs." Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py" ```python -#!/usr/bin/env python3 + # Calculate RMS + rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) -""" -Microphone Gain Adjustment Script with Clipping and Overload Detection + # Calculate THD over the full range + thd_percentage = thd_calculation(filtered_audio, sampling_rate) -This script captures audio from an RTSP stream, processes it to calculate the RMS -within the 2000-8000 Hz frequency band, detects clipping, calculates Sound Pressure Level (SPL), -and adjusts the microphone gain based on predefined noise thresholds, trends, and overload metrics. + # Calculate SPL + spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) -Dependencies: -- numpy -- scipy -- ffmpeg (installed and accessible in PATH) -- amixer (for microphone gain control) + # Detect microphone overload + overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) -Author: alexbelgium -Date: 27-Oct-2024 + return rms_amplitude, thd_percentage, spl, overload -Changelog: ------------ -2024-04-27: Initial version -- Implemented basic microphone gain adjustment based on RMS levels and Total Harmonic Distortion (THD) calculations. -- Introduced overload detection based on Sound Pressure Level (SPL). + except Exception as e: + debug_print(f"Exception during audio processing: {e}", "error") + time.sleep(5) # Small delay before retrying -2024-10-27: Updated for simplified noise and clipping detection -- Removed THD calculations, as natural bird harmonics affect the distortion metric. -- Introduced direct clipping detection by analyzing audio sample amplitudes. -- Refocused the gain adjustment criteria on RMS amplitude and SPL within the target band (2000-8000 Hz). -- Simplified main loop to focus on RMS, SPL, and clipping instead of THD. -- Added `detect_clipping` function to identify clipping events. -- Updated debug logging to enhance traceability and include SPL measurements. -- Adjusted trend detection logic for more responsive gain adjustment. - -""" - -import subprocess -import numpy as np -from scipy.signal import butter, sosfilt -import time -import re - -# ---------------------------- Configuration ---------------------------- - -# Microphone Settings -MICROPHONE_NAME = "Line In 1 Gain" # Adjust to match your microphone's control name -MIN_GAIN_DB = 20 # Minimum gain in dB -MAX_GAIN_DB = 45 # Maximum gain in dB -DECREASE_GAIN_STEP_DB = 1 # Gain decrease step in dB -INCREASE_GAIN_STEP_DB = 5 # Gain increase step in dB -CLIPPING_REDUCTION_DB = 3 # Reduction in dB if clipping is detected - -# Noise Thresholds -NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude -NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude - -# Trend Detection -TREND_COUNT_THRESHOLD = 3 # Number of consecutive trends needed to adjust gain - -# RTSP Stream URL -RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP stream URL - -# Debug Mode (1 for enabled, 0 for disabled) -DEBUG = 1 - -# Microphone Characteristics -MIC_SENSITIVITY_DB = -28 # dB (0 dB = 1V/Pa) -MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz - -# Calibration Constants (These may need to be adjusted based on actual calibration) -REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL - -# ----------------------------------------------------------------------- - - -def debug_print(msg): - """ - Prints debug messages if DEBUG mode is enabled. - - :param msg: The debug message to print. - """ - if DEBUG: - current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - print(f"[{current_time}] [DEBUG] {msg}") - - -def get_gain_db(mic_name): - """ - Retrieves the current gain setting of the specified microphone using amixer. - - :param mic_name: The name of the microphone control in amixer. - :return: The current gain in dB as a float, or None if retrieval fails. - """ - cmd = ['amixer', 'sget', mic_name] - try: - output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() - # Regex to find patterns like [30.00dB] - match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output) - if match: - gain_db = float(match.group(1)) - debug_print(f"Retrieved gain: {gain_db} dB") - return gain_db - else: - debug_print("No gain information found in amixer output.") - return None - except subprocess.CalledProcessError as e: - debug_print(f"amixer sget failed: {e}") - return None - - -def set_gain_db(mic_name, gain_db): - """ - Sets the gain of the specified microphone using amixer. - - :param mic_name: The name of the microphone control in amixer. - :param gain_db: The desired gain in dB. - :return: True if the gain was set successfully, False otherwise. - """ - cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB'] - try: - subprocess.check_call(cmd, stderr=subprocess.STDOUT) - debug_print(f"Set gain to: {gain_db} dB") - return True - except subprocess.CalledProcessError as e: - debug_print(f"amixer sset failed: {e}") - return False - - -def calculate_spl(audio, mic_sensitivity_db): - """ - Calculates the Sound Pressure Level (SPL) from the audio signal. - - :param audio: The audio signal as a numpy array. - :param mic_sensitivity_db: Microphone sensitivity in dB (0 dB = 1V/Pa). - :return: SPL in dB. - """ - # Calculate RMS amplitude - rms_amplitude = np.sqrt(np.mean(audio ** 2)) - if rms_amplitude == 0: - debug_print("RMS amplitude is zero. SPL cannot be calculated.") - return -np.inf - - # Convert RMS amplitude to voltage - # Assuming audio is normalized between -1 and 1 - # Convert voltage to pressure (Pa) - mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa - pressure = rms_amplitude / mic_sensitivity_linear # Pa - - # Calculate SPL - spl = 20 * np.log10(pressure / REFERENCE_PRESSURE) - debug_print(f"Calculated SPL: {spl:.2f} dB") - return spl - - -def detect_microphone_overload(spl, mic_clipping_spl): - """ - Detects if the calculated SPL is approaching the microphone's clipping SPL. - - :param spl: The calculated SPL. - :param mic_clipping_spl: The microphone's clipping SPL. - :return: True if overload is detected, False otherwise. - """ - if spl >= mic_clipping_spl - 3: # Consider overload if within 3 dB of clipping SPL - debug_print("Microphone overload detected.") - return True - return False - - -def detect_clipping(audio): - """ - Detects if clipping has occurred in the audio signal. - - :param audio: The audio signal as a numpy array. - :return: True if clipping is detected, False otherwise. - """ - max_amplitude = np.max(np.abs(audio)) - if max_amplitude >= 1.0: - debug_print("Clipping detected in audio signal.") - return True - return False - - -def calculate_noise_rms_and_spl(rtsp_url, bandpass_sos, sampling_rate): - """ - Captures audio from an RTSP stream, calculates RMS, SPL, and detects microphone overload. - - :param rtsp_url: The RTSP stream URL. - :param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections). - :param sampling_rate: Sampling rate of the audio signal. - :return: Tuple containing the RMS amplitude, SPL value, overload status, and clipping status. - """ - cmd = [ - 'ffmpeg', - '-loglevel', 'error', - '-rtsp_transport', 'tcp', - '-i', rtsp_url, - '-vn', - '-f', 's16le', - '-acodec', 'pcm_s16le', - '-ar', str(sampling_rate), - '-ac', '1', - '-t', '5', - '-' - ] - - try: - debug_print(f"Starting audio capture from {rtsp_url}") - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - - if process.returncode != 0: - debug_print(f"ffmpeg failed with error: {stderr.decode()}") - return None, None, False, False - - # Convert raw PCM data to numpy array - audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 - debug_print(f"Captured {len(audio)} samples from audio stream.") - - if len(audio) == 0: - debug_print("No audio data captured.") - return None, None, False, False - - # Detect clipping - clipping = detect_clipping(audio) - - # Apply bandpass filter - filtered_audio = sosfilt(bandpass_sos, audio) - debug_print("Applied bandpass filter to audio data.") - - # Calculate RMS - rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) - - # Calculate SPL - spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) - - # Detect microphone overload - overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) - - return rms_amplitude, spl, overload, clipping - - except Exception as e: - debug_print(f"Exception during audio processing: {e}") - return None, None, False, False + return None, None, None, False def main(): """ - Main loop that continuously monitors background noise, detects clipping, - and adjusts microphone gain accordingly. + Main loop that continuously monitors background noise, detects clipping, calculates THD, + and adjusts microphone gain with stabilization delay and retry logic for RTSP stream resilience. """ TREND_COUNT = 0 PREVIOUS_TREND = 0 + last_gain_adjustment_time = time.time() - GAIN_STABILIZATION_DELAY # Initialize - # Precompute the bandpass filter coefficients - LOWCUT = 2000 # Lower frequency bound in Hz - HIGHCUT = 8000 # Upper frequency bound in Hz - FILTER_ORDER = 5 # Order of the Butterworth filter - SAMPLING_RATE = 32000 # Sampling rate in Hz - + # Precompute bandpass filter coefficients with updated SAMPLING_RATE + LOWCUT = 2000 + HIGHCUT = 8000 + FILTER_ORDER = 5 sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos') - debug_print("Precomputed Butterworth bandpass filter coefficients.") # Set the microphone gain to the maximum gain at the start success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB) @@ -699,35 +474,39 @@ def main(): return while True: - rms, spl, overload, clipping = calculate_noise_rms_and_spl(RTSP_URL, sos, SAMPLING_RATE) + rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE) if rms is None: print("Failed to compute noise RMS. Retrying in 1 minute...") time.sleep(60) continue - # Print the final RMS amplitude - print(f"RMS Amplitude: {rms:.6f}") - debug_print(f"Current background noise (RMS amplitude): {rms:.6f}") - debug_print(f"Calculated SPL: {spl:.2f} dB") - - # Detect clipping and reduce gain if needed - if overload or clipping: + current_time = time.time() + + # Adjust gain if overload detected and sufficient time has passed + if overload and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY: current_gain_db = get_gain_db(MICROPHONE_NAME) if current_gain_db is not None: - NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB - if NEW_GAIN_DB < MIN_GAIN_DB: - NEW_GAIN_DB = MIN_GAIN_DB - success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) - if success: - print(f"Overload or clipping detected. Reduced gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to overload or clipping.") - else: - print("Failed to reduce gain due to overload or clipping.") - # Skip trend adjustment in case of overload or clipping - time.sleep(60) - continue + NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB) + if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): + print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.", "warning") + last_gain_adjustment_time = current_time + # Handle THD if SPL is above threshold + if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: + if thd > MAX_THD_PERCENTAGE: + debug_print(f"High THD detected: {thd:.2f}%", "warning") + current_gain_db = get_gain_db(MICROPHONE_NAME) + if current_gain_db is not None: + NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) + if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): + print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB") + debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.", "info") + last_gain_adjustment_time = current_time + else: + debug_print("THD within acceptable limits.", "info") + # Determine the noise trend if rms > NOISE_THRESHOLD_HIGH: CURRENT_TREND = 1 @@ -736,7 +515,7 @@ def main(): else: CURRENT_TREND = 0 - debug_print(f"Current trend: {CURRENT_TREND}") + debug_print(f"Current trend: {CURRENT_TREND}", "info") if CURRENT_TREND != 0: if CURRENT_TREND == PREVIOUS_TREND: @@ -747,7 +526,7 @@ def main(): else: TREND_COUNT = 0 - debug_print(f"Trend count: {TREND_COUNT}") + debug_print(f"Trend count: {TREND_COUNT}", "info") current_gain_db = get_gain_db(MICROPHONE_NAME) @@ -756,34 +535,27 @@ def main(): time.sleep(60) continue - debug_print(f"Current gain: {current_gain_db} dB") + debug_print(f"Current gain: {current_gain_db} dB", "info") - if TREND_COUNT >= TREND_COUNT_THRESHOLD: + # Adjust gain based on noise trend if threshold count is reached and stabilization delay has passed + if TREND_COUNT >= TREND_COUNT_THRESHOLD and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY: if CURRENT_TREND == 1: # Decrease gain by DECREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB - if NEW_GAIN_DB < MIN_GAIN_DB: - NEW_GAIN_DB = MIN_GAIN_DB - success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) - if success: + NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) + if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.") - else: - print("Failed to decrease gain.") + debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.", "info") + last_gain_adjustment_time = current_time elif CURRENT_TREND == -1: # Increase gain by INCREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = current_gain_db + INCREASE_GAIN_STEP_DB - if NEW_GAIN_DB > MAX_GAIN_DB: - NEW_GAIN_DB = MAX_GAIN_DB - success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB) - if success: + NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB) + if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.") - else: - print("Failed to increase gain.") + debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.", "info") + last_gain_adjustment_time = current_time TREND_COUNT = 0 else: - debug_print("No gain adjustment needed based on noise trend.") + debug_print("No gain adjustment needed based on noise trend.", "info") # Sleep for 1 minute before the next iteration time.sleep(60)