diff --git a/birdnet-pi/DOCS.md b/birdnet-pi/DOCS.md index 178a6137c..45740be17 100644 --- a/birdnet-pi/DOCS.md +++ b/birdnet-pi/DOCS.md @@ -420,378 +420,321 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py" ```python #!/usr/bin/env python3 """ -Microphone Gain Adjustment Script with THD and Overload Detection +Dynamic Microphone Gain Adjustment Script with Interactive Calibration, +Self‑Modification, and a Test Mode for Real‑Time RMS Graph using plotext -This script captures audio from an RTSP stream, processes it to calculate the RMS -within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic -Distortion (THD) over the full frequency range, and adjusts the microphone gain based -on predefined noise thresholds, trends, and distortion metrics. +Usage: + Normal operation (gain control loop with default values): + ./autogain.py + Interactive calibration (prompts for mic specs, then asks to save values): + ./autogain.py --calibrate + Test mode (real‑time RMS evolution graph with color coding): + ./autogain.py --test -Dependencies: -- numpy -- scipy -- ffmpeg (installed and accessible in PATH) -- amixer (for microphone gain control) - -Author: OpenAI ChatGPT -Date: 2024-10-28 (Updated) - -Changelog: -- 2024-10-27: Increased sampling rate to 48,000 Hz. -- 2024-10-27: Extended THD calculation over the full frequency range. -- 2024-10-27: Added gain stabilization delay to reduce frequent adjustments. -- 2024-10-27: Improved RTSP stream resilience with retry logic. -- 2024-10-27: Enhanced debug output with logging levels. -- 2024-10-28: Added summary log mode for simplified output. -- 2024-10-28: Removed gain stabilization delay for immediate gain adjustments. +Author: Your Name +Date: 2025-04-08 """ +import argparse import subprocess import numpy as np -from scipy.signal import butter, sosfilt, find_peaks +from scipy.signal import butter, sosfilt import time import re +import sys +import os -# ---------------------------- Configuration ---------------------------- +# ---------------------- Default Configuration ---------------------- -# Microphone Settings MICROPHONE_NAME = "Line In 1 Gain" MIN_GAIN_DB = 20 MAX_GAIN_DB = 40 -DECREASE_GAIN_STEP_DB = 1 -INCREASE_GAIN_STEP_DB = 5 -CLIPPING_REDUCTION_DB = 3 +GAIN_STEP_DB = 3 -# Noise Thresholds -NOISE_THRESHOLD_HIGH = 0.001 -NOISE_THRESHOLD_LOW = 0.00035 +# Default RMS thresholds (used in normal operation) +NOISE_THRESHOLD_HIGH = 0.0012589 +NOISE_THRESHOLD_LOW = 0.00035 -# Trend Detection -TREND_COUNT_THRESHOLD = 3 +SAMPLING_RATE = 48000 # 48 kHz +LOWCUT = 2000 +HIGHCUT = 8000 +FILTER_ORDER = 4 +RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" +SLEEP_SECONDS = 10 -# Sampling Rate -SAMPLING_RATE = 44100 +REFERENCE_PRESSURE = 20e-6 # 20 µPa -# RTSP Stream URL -RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" +# Default microphone specifications (for calibration reference) +DEFAULT_SNR = 80.0 # dB +DEFAULT_SELF_NOISE = 14.0 # dB-A +DEFAULT_CLIPPING = 120.0 # dB SPL +DEFAULT_SENSITIVITY = -28.0 # dB re 1 V/Pa -# Debug and Summary Modes -DEBUG = 1 # Debug Mode (1 for enabled, 0 for disabled) -SUMMARY_MODE = True # Summary Mode (True for summary output only) +# Compute the default full-scale amplitude (used to derive default fractions) +def_full_scale = (REFERENCE_PRESSURE * + 10 ** (DEFAULT_CLIPPING / 20) * + 10 ** (DEFAULT_SENSITIVITY / 20)) +# ---------------------- Argument Parsing ---------------------- -# Microphone Characteristics -MIC_SENSITIVITY_DB = -28 -MIC_CLIPPING_SPL = 120 - -# Calibration Constants -REFERENCE_PRESSURE = 20e-6 - -# THD Settings -THD_FUNDAMENTAL_THRESHOLD_DB = 60 -MAX_THD_PERCENTAGE = 5.0 - -# ----------------------------------------------------------------------- +def parse_args(): + parser = argparse.ArgumentParser( + description="Dynamic Mic Gain Adjustment with calibration, test mode, and self‑modification." + ) + parser.add_argument("--calibrate", action="store_true", help="Run interactive calibration mode") + parser.add_argument("--test", action="store_true", help="Run test mode to display a real‑time RMS graph using plotext") + return parser.parse_args() +# ---------------------- Audio & Gain Helpers ---------------------- def debug_print(msg, level="info"): - """ - Prints debug messages with logging levels if DEBUG mode is enabled. - :param msg: The debug message to print. - :param level: Logging level - "info", "warning", "error". - """ - if DEBUG: - current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - print(f"[{current_time}] [{level.upper()}] {msg}") - - -def summary_log(current_gain, clipping, rms_amplitude, thd_percentage): - """ - Outputs a summary log with date, time, current gain, clipping status, background noise, and THD. - :param current_gain: Current microphone gain in dB. - :param clipping: Clipping status (yes/no). - :param rms_amplitude: Background noise RMS amplitude. - :param thd_percentage: THD in percentage. - """ - if SUMMARY_MODE: - current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - clipping_status = "Yes" if clipping else "No" - print(f"[{current_time}] [SUMMARY] Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | " - f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%") - + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(f"[{current_time}] [{level.upper()}] {msg}") def get_gain_db(mic_name): - """ - Retrieves the current gain setting of the specified microphone using amixer. - """ - cmd = ['amixer', 'sget', mic_name] try: - output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() + output = subprocess.check_output(['amixer', 'sget', mic_name], + stderr=subprocess.STDOUT).decode() match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output) if match: - gain_db = float(match.group(1)) - debug_print(f"Retrieved gain: {gain_db} dB", "info") - return gain_db + return float(match.group(1)) else: - debug_print("No gain information found in amixer output.", "warning") - return None + debug_print("No gain information found.", "warning") except subprocess.CalledProcessError as e: debug_print(f"amixer sget failed: {e}", "error") - return None - + return None def set_gain_db(mic_name, gain_db): - """ - Sets the gain of the specified microphone using amixer. - """ - gain_db_int = int(gain_db) - if gain_db_int > MAX_GAIN_DB: - debug_print(f"Requested gain {gain_db_int} dB exceeds MAX_GAIN_DB {MAX_GAIN_DB} dB. Skipping.", "warning") - return False # Do not exceed max gain - cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB'] + gain_db = max(min(gain_db, MAX_GAIN_DB), MIN_GAIN_DB) try: - subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) - debug_print(f"Set gain to: {gain_db} dB", "info") + subprocess.check_call(['amixer', 'sset', mic_name, f'{int(gain_db)}dB'], + stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + debug_print(f"Gain set to: {gain_db} dB", "info") return True except subprocess.CalledProcessError as e: - debug_print(f"amixer sset failed: {e}", "error") - return False - - -def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=2000, max_freq=8000): - """ - Dynamically finds the fundamental frequency within a specified range. - """ - idx_min = np.searchsorted(fft_freqs, min_freq) - idx_max = np.searchsorted(fft_freqs, max_freq) - if idx_max <= idx_min: - return None, 0 - - search_magnitude = fft_magnitude[idx_min:idx_max] - search_freqs = fft_freqs[idx_min:idx_max] - peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1) - if len(peaks) == 0: - return None, 0 - - max_peak_idx = np.argmax(properties['peak_heights']) - fundamental_freq = search_freqs[peaks[max_peak_idx]] - fundamental_amplitude = search_magnitude[peaks[max_peak_idx]] - - debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info") - return fundamental_freq, fundamental_amplitude - - -def thd_calculation(audio, sampling_rate, num_harmonics=5): - """ - Calculates Total Harmonic Distortion (THD) for the audio signal. - """ - fft_vals = np.fft.rfft(audio) - fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate) - fft_magnitude = np.abs(fft_vals) - fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude) - - if fundamental_freq is None or fundamental_amplitude < 1e-6: - debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning") - return 0.0 - - harmonic_amplitudes = [] - for n in range(2, num_harmonics + 1): - harmonic_freq = n * fundamental_freq - if harmonic_freq > sampling_rate / 2: - break - harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq)) - harmonic_amp = fft_magnitude[harmonic_idx] - harmonic_amplitudes.append(harmonic_amp) - debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info") - - harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes))) - thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0 - debug_print(f"THD Calculation: {thd:.2f}%", "info") - return thd - - -def calculate_spl(audio, mic_sensitivity_db): - """ - Calculates the Sound Pressure Level (SPL) from the audio signal. - """ - rms_amplitude = np.sqrt(np.mean(audio ** 2)) - if rms_amplitude == 0: - debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning") - return -np.inf - - mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) - pressure = rms_amplitude / mic_sensitivity_linear - spl = 20 * np.log10(pressure / REFERENCE_PRESSURE) - debug_print(f"Calculated SPL: {spl:.2f} dB", "info") - return spl - - -def detect_microphone_overload(spl, mic_clipping_spl): - """ - Detects if the calculated SPL is approaching the microphone's clipping SPL. - """ - if spl >= mic_clipping_spl - 3: - debug_print("Microphone overload detected.", "warning") - return True + debug_print(f"Failed to set gain: {e}", "error") return False +def capture_audio(rtsp_url, duration=5): + cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', + '-i', rtsp_url, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', + '-ar', str(SAMPLING_RATE), '-ac', '1', '-t', str(duration), '-'] + try: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + debug_print(f"ffmpeg failed: {stderr.decode().strip()}", "error") + return None + return np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 + except Exception as e: + debug_print(f"Audio capture exception: {e}", "error") + return None -def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5): - """ - Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload. - """ - cmd = [ - 'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url, - '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate), '-ac', '1', '-t', '5', '-' - ] +def bandpass_filter(audio, lowcut, highcut, fs, order=4): + sos = butter(order, [lowcut, highcut], btype='band', fs=fs, output='sos') + return sosfilt(sos, audio) - retries = 3 - for attempt in range(retries): +def measure_rms(audio): + return float(np.sqrt(np.mean(audio**2))) if len(audio) > 0 else 0.0 + +# ---------------------- Interactive Calibration ---------------------- + +def prompt_float(prompt_str, default_val): + while True: + user_input = input(f"{prompt_str} [{default_val}]: ").strip() + if user_input == "": + return default_val try: - debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info") - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() + return float(user_input) + except ValueError: + print("Invalid input; please enter a numeric value.") - if process.returncode != 0: - debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error") - time.sleep(5) - continue +def interactive_calibration(): + print("\n-- INTERACTIVE CALIBRATION --") + print("Enter the microphone characteristics (press Enter to accept default):\n") + snr = prompt_float("1) Signal-to-Noise Ratio (dB)", DEFAULT_SNR) + self_noise = prompt_float("2) Self Noise (dB-A)", DEFAULT_SELF_NOISE) + clipping = prompt_float("3) Clipping SPL (dB)", DEFAULT_CLIPPING) + sensitivity = prompt_float("4) Sensitivity (dB re 1 V/Pa)", DEFAULT_SENSITIVITY) + return { + "snr": snr, + "self_noise": self_noise, + "clipping": clipping, + "sensitivity": sensitivity, + } - audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 - debug_print(f"Captured {len(audio)} samples from audio stream.", "info") - if len(audio) == 0: - debug_print("No audio data captured.", "warning") - time.sleep(5) - continue +def calibrate_and_propose(mic_params): + user_snr = mic_params["snr"] + # self_noise is collected but not directly used in these calculations. + clipping = mic_params["clipping"] + sensitivity = mic_params["sensitivity"] - filtered_audio = sosfilt(bandpass_sos, audio) - rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) - thd_percentage = thd_calculation(filtered_audio, sampling_rate) - spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) - overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) + # Compute the user's full-scale amplitude from clipping and sensitivity: + user_full_scale = (REFERENCE_PRESSURE * + 10 ** (clipping / 20) * + 10 ** (sensitivity / 20)) + + # Derive default fractions from default thresholds: + fraction_high_default = NOISE_THRESHOLD_HIGH / def_full_scale + fraction_low_default = NOISE_THRESHOLD_LOW / def_full_scale - return rms_amplitude, thd_percentage, spl, overload + # Adjust thresholds using the ratio of user SNR to default SNR: + snr_ratio = user_snr / DEFAULT_SNR - except Exception as e: - debug_print(f"Exception during audio processing: {e}", "error") - time.sleep(5) # Small delay before retrying + proposed_high = fraction_high_default * user_full_scale * snr_ratio + proposed_low = fraction_low_default * user_full_scale * snr_ratio - return None, None, None, False + # For the gain range, adjust by the difference in sensitivity: + gain_offset = (DEFAULT_SENSITIVITY - sensitivity) + proposed_min_gain = MIN_GAIN_DB + gain_offset + proposed_max_gain = MAX_GAIN_DB + gain_offset + # Show current values and proposed values: + print("\n===============================================================") + print("CURRENT VALUES:") + print("---------------------------------------------------------------") + print(f" NOISE_THRESHOLD_HIGH: {NOISE_THRESHOLD_HIGH:.7f}") + print(f" NOISE_THRESHOLD_LOW: {NOISE_THRESHOLD_LOW:.7f}") + print(f" MIN_GAIN_DB: {MIN_GAIN_DB}") + print(f" MAX_GAIN_DB: {MAX_GAIN_DB}") + print("---------------------------------------------------------------\n") + print("PROPOSED VALUES:") + print("---------------------------------------------------------------") + print(f" Proposed NOISE_THRESHOLD_HIGH: {proposed_high:.7f}") + print(f" Proposed NOISE_THRESHOLD_LOW: {proposed_low:.7f}\n") + print(" Proposed Gain Range (dB):") + print(f" MIN_GAIN_DB: {proposed_min_gain:.2f}") + print(f" MAX_GAIN_DB: {proposed_max_gain:.2f}") + print("---------------------------------------------------------------\n") -def main(): - """ - Main loop that continuously monitors background noise, detects clipping, calculates THD, - and adjusts microphone gain with retry logic for RTSP stream resilience. - """ - TREND_COUNT = 0 - PREVIOUS_TREND = 0 + return { + "noise_threshold_high": proposed_high, + "noise_threshold_low": proposed_low, + "min_gain_db": proposed_min_gain, + "max_gain_db": proposed_max_gain, + } - # Precompute bandpass filter coefficients with updated SAMPLING_RATE - LOWCUT = 2000 - HIGHCUT = 8000 - FILTER_ORDER = 5 - sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos') +def persist_calibration_to_script(script_path, proposal): + subs = { + "NOISE_THRESHOLD_HIGH": f"{proposal['noise_threshold_high']:.7f}", + "NOISE_THRESHOLD_LOW": f"{proposal['noise_threshold_low']:.7f}", + "MIN_GAIN_DB": f"{int(round(proposal['min_gain_db']))}", + "MAX_GAIN_DB": f"{int(round(proposal['max_gain_db']))}" + } + for var, val in subs.items(): + cmd = f"sed -i 's|^{var} = .*|{var} = {val}|' \"{script_path}\"" + os.system(cmd) + print("✅ Script has been updated with the new calibration values.\n") - # Set the microphone gain to the maximum gain at the start - success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB) - if success: - print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.") - else: - print("Failed to set microphone gain at start. Exiting.") - return +# ---------------------- Test Mode: Real-Time RMS Graph using plotext ---------------------- + +def test_mode(): + try: + import plotext as plt + except ImportError: + print("plotext is required for test mode. Please install it using: pip install plotext") + sys.exit(1) + + print("\n-- TEST MODE: Real-Time RMS Line Graph (plotext) --") + print("Recording 5-second samples in a loop. Press Ctrl+C to exit.\n") + + rms_history = [] + iterations = [] + max_points = 20 # Number of samples shown in the window + i = 0 while True: - rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE) - - if rms is None: - print("Failed to compute noise RMS. Retrying in 1 minute...") - time.sleep(60) + audio = capture_audio(RTSP_URL, duration=5) + if audio is None or len(audio) == 0: + print("No audio captured, retrying...") + time.sleep(5) continue - # Adjust gain if overload detected - if overload: - current_gain_db = get_gain_db(MICROPHONE_NAME) - if current_gain_db is not None: - NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.", "warning") - # No stabilization delay; continue to next iteration - # Skip trend adjustment in case of clipping - summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd) - time.sleep(60) - continue + filtered_audio = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER) + rms = measure_rms(filtered_audio) - # Handle THD if SPL is above threshold - if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: - if thd > MAX_THD_PERCENTAGE: - debug_print(f"High THD detected: {thd:.2f}%", "warning") - current_gain_db = get_gain_db(MICROPHONE_NAME) - if current_gain_db is not None: - NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.", "info") - else: - debug_print("THD within acceptable limits.", "info") - else: - debug_print("SPL below THD calculation threshold. Skipping THD check.", "info") + rms_history.append(rms) + iterations.append(i) + i += 1 - # Determine the noise trend + # Keep only the last `max_points` entries + if len(rms_history) > max_points: + rms_history = rms_history[-max_points:] + iterations = iterations[-max_points:] + + # Determine status for text output if rms > NOISE_THRESHOLD_HIGH: - CURRENT_TREND = 1 + status = "🔴 ABOVE" elif rms < NOISE_THRESHOLD_LOW: - CURRENT_TREND = -1 + status = "🔵 BELOW" else: - CURRENT_TREND = 0 + status = "🟢 OK" - debug_print(f"Current trend: {CURRENT_TREND}", "info") + # Plot the graph + plt.clf() + plt.plot(iterations, rms_history, marker="dot", color="cyan") + plt.horizontal_line(NOISE_THRESHOLD_HIGH, color="red") + plt.horizontal_line(NOISE_THRESHOLD_LOW, color="blue") + plt.title("Real-Time RMS (Line Graph)") + plt.xlabel("Iteration") + plt.ylabel("RMS") + plt.ylim(0, max(0.001, max(rms_history) * 1.2)) + plt.show() - if CURRENT_TREND != 0: - if CURRENT_TREND == PREVIOUS_TREND: - TREND_COUNT += 1 - else: - TREND_COUNT = 1 - PREVIOUS_TREND = CURRENT_TREND - else: - TREND_COUNT = 0 + print(f"Current RMS: {rms:.6f} — {status}") + time.sleep(0.5) - debug_print(f"Trend count: {TREND_COUNT}", "info") +# ---------------------- Dynamic Gain Control Loop ---------------------- - current_gain_db = get_gain_db(MICROPHONE_NAME) - - if current_gain_db is None: - print("Failed to get current gain level. Retrying in 1 minute...") - time.sleep(60) +def dynamic_gain_control(): + debug_print("Starting dynamic gain controller...") + set_gain_db(MICROPHONE_NAME, (MIN_GAIN_DB + MAX_GAIN_DB) // 2) + while True: + audio = capture_audio(RTSP_URL) + if audio is None or len(audio) == 0: + debug_print("No audio captured; retrying...", "warning") + time.sleep(SLEEP_SECONDS) continue - - debug_print(f"Current gain: {current_gain_db} dB", "info") - - # Output summary log for the current state - summary_log(current_gain_db, overload, rms, thd) - - # Adjust gain based on noise trend if threshold count is reached - if TREND_COUNT >= TREND_COUNT_THRESHOLD: - if CURRENT_TREND == 1 and int(current_gain_db) > MIN_GAIN_DB: - # Decrease gain by DECREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.", "info") - TREND_COUNT = 0 - elif CURRENT_TREND == -1 and int(current_gain_db) < MAX_GAIN_DB: - # Increase gain by INCREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB") - debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.", "info") - TREND_COUNT = 0 + filtered_audio = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER) + rms = measure_rms(filtered_audio) + debug_print(f"Measured RMS: {rms:.6f}", "info") + current_gain = get_gain_db(MICROPHONE_NAME) + if current_gain is None: + debug_print("Failed to read current gain; skipping cycle.", "warning") + time.sleep(SLEEP_SECONDS) + continue + if rms > NOISE_THRESHOLD_HIGH: + debug_print(f"Signal too high: {rms:.6f} > {NOISE_THRESHOLD_HIGH:.7f}. Decreasing gain...", "info") + set_gain_db(MICROPHONE_NAME, current_gain - GAIN_STEP_DB) + elif rms < NOISE_THRESHOLD_LOW: + debug_print(f"Signal too low: {rms:.6f} < {NOISE_THRESHOLD_LOW:.7f}. Increasing gain...", "info") + set_gain_db(MICROPHONE_NAME, current_gain + GAIN_STEP_DB) else: - debug_print("No gain adjustment needed based on noise trend.", "info") + debug_print("RMS within acceptable range; no gain change.", "info") + time.sleep(SLEEP_SECONDS) - # Sleep for 1 minute before the next iteration - time.sleep(60) +# ---------------------- Main ---------------------- +def main(): + args = parse_args() + + if args.calibrate: + mic_params = interactive_calibration() + proposal = calibrate_and_propose(mic_params) + save = input("Save these values permanently into the script? [y/N]: ").strip().lower() + if save in ["y", "yes"]: + script_path = os.path.abspath(__file__) + persist_calibration_to_script(script_path, proposal) + print("👍 Calibration values saved. Exiting now.\n") + else: + print("❌ Not saving values. Exiting.\n") + sys.exit(0) + + if args.test: + test_mode() + sys.exit(0) + + # Normal operation: run dynamic gain control. + dynamic_gain_control() if __name__ == "__main__": main()