Update autogain.py

This commit is contained in:
Alexandre
2025-04-08 14:38:24 +02:00
committed by GitHub
parent 83c51c9d84
commit 79c8756261

View File

@@ -420,378 +420,321 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
```python ```python
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Microphone Gain Adjustment Script with THD and Overload Detection Dynamic Microphone Gain Adjustment Script with Interactive Calibration,
SelfModification, and a Test Mode for RealTime RMS Graph using plotext
This script captures audio from an RTSP stream, processes it to calculate the RMS Usage:
within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic Normal operation (gain control loop with default values):
Distortion (THD) over the full frequency range, and adjusts the microphone gain based ./autogain.py
on predefined noise thresholds, trends, and distortion metrics. Interactive calibration (prompts for mic specs, then asks to save values):
./autogain.py --calibrate
Test mode (realtime RMS evolution graph with color coding):
./autogain.py --test
Dependencies: Author: Your Name
- numpy Date: 2025-04-08
- scipy
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-10-28 (Updated)
Changelog:
- 2024-10-27: Increased sampling rate to 48,000 Hz.
- 2024-10-27: Extended THD calculation over the full frequency range.
- 2024-10-27: Added gain stabilization delay to reduce frequent adjustments.
- 2024-10-27: Improved RTSP stream resilience with retry logic.
- 2024-10-27: Enhanced debug output with logging levels.
- 2024-10-28: Added summary log mode for simplified output.
- 2024-10-28: Removed gain stabilization delay for immediate gain adjustments.
""" """
import argparse
import subprocess import subprocess
import numpy as np import numpy as np
from scipy.signal import butter, sosfilt, find_peaks from scipy.signal import butter, sosfilt
import time import time
import re import re
import sys
import os
# ---------------------------- Configuration ---------------------------- # ---------------------- Default Configuration ----------------------
# Microphone Settings
MICROPHONE_NAME = "Line In 1 Gain" MICROPHONE_NAME = "Line In 1 Gain"
MIN_GAIN_DB = 20 MIN_GAIN_DB = 20
MAX_GAIN_DB = 40 MAX_GAIN_DB = 40
DECREASE_GAIN_STEP_DB = 1 GAIN_STEP_DB = 3
INCREASE_GAIN_STEP_DB = 5
CLIPPING_REDUCTION_DB = 3
# Noise Thresholds # Default RMS thresholds (used in normal operation)
NOISE_THRESHOLD_HIGH = 0.001 NOISE_THRESHOLD_HIGH = 0.0012589
NOISE_THRESHOLD_LOW = 0.00035 NOISE_THRESHOLD_LOW = 0.00035
# Trend Detection SAMPLING_RATE = 48000 # 48 kHz
TREND_COUNT_THRESHOLD = 3 LOWCUT = 2000
HIGHCUT = 8000
FILTER_ORDER = 4
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic"
SLEEP_SECONDS = 10
# Sampling Rate REFERENCE_PRESSURE = 20e-6 # 20 µPa
SAMPLING_RATE = 44100
# RTSP Stream URL # Default microphone specifications (for calibration reference)
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" DEFAULT_SNR = 80.0 # dB
DEFAULT_SELF_NOISE = 14.0 # dB-A
DEFAULT_CLIPPING = 120.0 # dB SPL
DEFAULT_SENSITIVITY = -28.0 # dB re 1 V/Pa
# Debug and Summary Modes # Compute the default full-scale amplitude (used to derive default fractions)
DEBUG = 1 # Debug Mode (1 for enabled, 0 for disabled) def_full_scale = (REFERENCE_PRESSURE *
SUMMARY_MODE = True # Summary Mode (True for summary output only) 10 ** (DEFAULT_CLIPPING / 20) *
10 ** (DEFAULT_SENSITIVITY / 20))
# ---------------------- Argument Parsing ----------------------
# Microphone Characteristics def parse_args():
MIC_SENSITIVITY_DB = -28 parser = argparse.ArgumentParser(
MIC_CLIPPING_SPL = 120 description="Dynamic Mic Gain Adjustment with calibration, test mode, and selfmodification."
)
# Calibration Constants parser.add_argument("--calibrate", action="store_true", help="Run interactive calibration mode")
REFERENCE_PRESSURE = 20e-6 parser.add_argument("--test", action="store_true", help="Run test mode to display a realtime RMS graph using plotext")
return parser.parse_args()
# THD Settings
THD_FUNDAMENTAL_THRESHOLD_DB = 60
MAX_THD_PERCENTAGE = 5.0
# -----------------------------------------------------------------------
# ---------------------- Audio & Gain Helpers ----------------------
def debug_print(msg, level="info"): def debug_print(msg, level="info"):
""" current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
Prints debug messages with logging levels if DEBUG mode is enabled. print(f"[{current_time}] [{level.upper()}] {msg}")
:param msg: The debug message to print.
:param level: Logging level - "info", "warning", "error".
"""
if DEBUG:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{current_time}] [{level.upper()}] {msg}")
def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
"""
Outputs a summary log with date, time, current gain, clipping status, background noise, and THD.
:param current_gain: Current microphone gain in dB.
:param clipping: Clipping status (yes/no).
:param rms_amplitude: Background noise RMS amplitude.
:param thd_percentage: THD in percentage.
"""
if SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
clipping_status = "Yes" if clipping else "No"
print(f"[{current_time}] [SUMMARY] Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | "
f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%")
def get_gain_db(mic_name): def get_gain_db(mic_name):
"""
Retrieves the current gain setting of the specified microphone using amixer.
"""
cmd = ['amixer', 'sget', mic_name]
try: try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() output = subprocess.check_output(['amixer', 'sget', mic_name],
stderr=subprocess.STDOUT).decode()
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output) match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match: if match:
gain_db = float(match.group(1)) return float(match.group(1))
debug_print(f"Retrieved gain: {gain_db} dB", "info")
return gain_db
else: else:
debug_print("No gain information found in amixer output.", "warning") debug_print("No gain information found.", "warning")
return None
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
debug_print(f"amixer sget failed: {e}", "error") debug_print(f"amixer sget failed: {e}", "error")
return None return None
def set_gain_db(mic_name, gain_db): def set_gain_db(mic_name, gain_db):
""" gain_db = max(min(gain_db, MAX_GAIN_DB), MIN_GAIN_DB)
Sets the gain of the specified microphone using amixer.
"""
gain_db_int = int(gain_db)
if gain_db_int > MAX_GAIN_DB:
debug_print(f"Requested gain {gain_db_int} dB exceeds MAX_GAIN_DB {MAX_GAIN_DB} dB. Skipping.", "warning")
return False # Do not exceed max gain
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try: try:
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) subprocess.check_call(['amixer', 'sset', mic_name, f'{int(gain_db)}dB'],
debug_print(f"Set gain to: {gain_db} dB", "info") stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
debug_print(f"Gain set to: {gain_db} dB", "info")
return True return True
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
debug_print(f"amixer sset failed: {e}", "error") debug_print(f"Failed to set gain: {e}", "error")
return False
def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=2000, max_freq=8000):
"""
Dynamically finds the fundamental frequency within a specified range.
"""
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0
search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0
max_peak_idx = np.argmax(properties['peak_heights'])
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]
debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info")
return fundamental_freq, fundamental_amplitude
def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
"""
fft_vals = np.fft.rfft(audio)
fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)
if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning")
return 0.0
harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info")
harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0
debug_print(f"THD Calculation: {thd:.2f}%", "info")
return thd
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
"""
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning")
return -np.inf
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20)
pressure = rms_amplitude / mic_sensitivity_linear
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB", "info")
return spl
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
"""
if spl >= mic_clipping_spl - 3:
debug_print("Microphone overload detected.", "warning")
return True
return False return False
def capture_audio(rtsp_url, duration=5):
cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp',
'-i', rtsp_url, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le',
'-ar', str(SAMPLING_RATE), '-ac', '1', '-t', str(duration), '-']
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug_print(f"ffmpeg failed: {stderr.decode().strip()}", "error")
return None
return np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
except Exception as e:
debug_print(f"Audio capture exception: {e}", "error")
return None
def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5): def bandpass_filter(audio, lowcut, highcut, fs, order=4):
""" sos = butter(order, [lowcut, highcut], btype='band', fs=fs, output='sos')
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload. return sosfilt(sos, audio)
"""
cmd = [
'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url,
'-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate), '-ac', '1', '-t', '5', '-'
]
retries = 3 def measure_rms(audio):
for attempt in range(retries): return float(np.sqrt(np.mean(audio**2))) if len(audio) > 0 else 0.0
# ---------------------- Interactive Calibration ----------------------
def prompt_float(prompt_str, default_val):
while True:
user_input = input(f"{prompt_str} [{default_val}]: ").strip()
if user_input == "":
return default_val
try: try:
debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info") return float(user_input)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except ValueError:
stdout, stderr = process.communicate() print("Invalid input; please enter a numeric value.")
if process.returncode != 0: def interactive_calibration():
debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error") print("\n-- INTERACTIVE CALIBRATION --")
time.sleep(5) print("Enter the microphone characteristics (press Enter to accept default):\n")
continue snr = prompt_float("1) Signal-to-Noise Ratio (dB)", DEFAULT_SNR)
self_noise = prompt_float("2) Self Noise (dB-A)", DEFAULT_SELF_NOISE)
clipping = prompt_float("3) Clipping SPL (dB)", DEFAULT_CLIPPING)
sensitivity = prompt_float("4) Sensitivity (dB re 1 V/Pa)", DEFAULT_SENSITIVITY)
return {
"snr": snr,
"self_noise": self_noise,
"clipping": clipping,
"sensitivity": sensitivity,
}
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 def calibrate_and_propose(mic_params):
debug_print(f"Captured {len(audio)} samples from audio stream.", "info") user_snr = mic_params["snr"]
if len(audio) == 0: # self_noise is collected but not directly used in these calculations.
debug_print("No audio data captured.", "warning") clipping = mic_params["clipping"]
time.sleep(5) sensitivity = mic_params["sensitivity"]
continue
filtered_audio = sosfilt(bandpass_sos, audio) # Compute the user's full-scale amplitude from clipping and sensitivity:
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) user_full_scale = (REFERENCE_PRESSURE *
thd_percentage = thd_calculation(filtered_audio, sampling_rate) 10 ** (clipping / 20) *
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) 10 ** (sensitivity / 20))
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
# Derive default fractions from default thresholds:
fraction_high_default = NOISE_THRESHOLD_HIGH / def_full_scale
fraction_low_default = NOISE_THRESHOLD_LOW / def_full_scale
return rms_amplitude, thd_percentage, spl, overload # Adjust thresholds using the ratio of user SNR to default SNR:
snr_ratio = user_snr / DEFAULT_SNR
except Exception as e: proposed_high = fraction_high_default * user_full_scale * snr_ratio
debug_print(f"Exception during audio processing: {e}", "error") proposed_low = fraction_low_default * user_full_scale * snr_ratio
time.sleep(5) # Small delay before retrying
return None, None, None, False # For the gain range, adjust by the difference in sensitivity:
gain_offset = (DEFAULT_SENSITIVITY - sensitivity)
proposed_min_gain = MIN_GAIN_DB + gain_offset
proposed_max_gain = MAX_GAIN_DB + gain_offset
# Show current values and proposed values:
print("\n===============================================================")
print("CURRENT VALUES:")
print("---------------------------------------------------------------")
print(f" NOISE_THRESHOLD_HIGH: {NOISE_THRESHOLD_HIGH:.7f}")
print(f" NOISE_THRESHOLD_LOW: {NOISE_THRESHOLD_LOW:.7f}")
print(f" MIN_GAIN_DB: {MIN_GAIN_DB}")
print(f" MAX_GAIN_DB: {MAX_GAIN_DB}")
print("---------------------------------------------------------------\n")
print("PROPOSED VALUES:")
print("---------------------------------------------------------------")
print(f" Proposed NOISE_THRESHOLD_HIGH: {proposed_high:.7f}")
print(f" Proposed NOISE_THRESHOLD_LOW: {proposed_low:.7f}\n")
print(" Proposed Gain Range (dB):")
print(f" MIN_GAIN_DB: {proposed_min_gain:.2f}")
print(f" MAX_GAIN_DB: {proposed_max_gain:.2f}")
print("---------------------------------------------------------------\n")
def main(): return {
""" "noise_threshold_high": proposed_high,
Main loop that continuously monitors background noise, detects clipping, calculates THD, "noise_threshold_low": proposed_low,
and adjusts microphone gain with retry logic for RTSP stream resilience. "min_gain_db": proposed_min_gain,
""" "max_gain_db": proposed_max_gain,
TREND_COUNT = 0 }
PREVIOUS_TREND = 0
# Precompute bandpass filter coefficients with updated SAMPLING_RATE def persist_calibration_to_script(script_path, proposal):
LOWCUT = 2000 subs = {
HIGHCUT = 8000 "NOISE_THRESHOLD_HIGH": f"{proposal['noise_threshold_high']:.7f}",
FILTER_ORDER = 5 "NOISE_THRESHOLD_LOW": f"{proposal['noise_threshold_low']:.7f}",
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos') "MIN_GAIN_DB": f"{int(round(proposal['min_gain_db']))}",
"MAX_GAIN_DB": f"{int(round(proposal['max_gain_db']))}"
}
for var, val in subs.items():
cmd = f"sed -i 's|^{var} = .*|{var} = {val}|' \"{script_path}\""
os.system(cmd)
print("✅ Script has been updated with the new calibration values.\n")
# Set the microphone gain to the maximum gain at the start # ---------------------- Test Mode: Real-Time RMS Graph using plotext ----------------------
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
if success: def test_mode():
print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.") try:
else: import plotext as plt
print("Failed to set microphone gain at start. Exiting.") except ImportError:
return print("plotext is required for test mode. Please install it using: pip install plotext")
sys.exit(1)
print("\n-- TEST MODE: Real-Time RMS Line Graph (plotext) --")
print("Recording 5-second samples in a loop. Press Ctrl+C to exit.\n")
rms_history = []
iterations = []
max_points = 20 # Number of samples shown in the window
i = 0
while True: while True:
rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE) audio = capture_audio(RTSP_URL, duration=5)
if audio is None or len(audio) == 0:
if rms is None: print("No audio captured, retrying...")
print("Failed to compute noise RMS. Retrying in 1 minute...") time.sleep(5)
time.sleep(60)
continue continue
# Adjust gain if overload detected filtered_audio = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER)
if overload: rms = measure_rms(filtered_audio)
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.", "warning")
# No stabilization delay; continue to next iteration
# Skip trend adjustment in case of clipping
summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd)
time.sleep(60)
continue
# Handle THD if SPL is above threshold rms_history.append(rms)
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: iterations.append(i)
if thd > MAX_THD_PERCENTAGE: i += 1
debug_print(f"High THD detected: {thd:.2f}%", "warning")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.", "info")
else:
debug_print("THD within acceptable limits.", "info")
else:
debug_print("SPL below THD calculation threshold. Skipping THD check.", "info")
# Determine the noise trend # Keep only the last `max_points` entries
if len(rms_history) > max_points:
rms_history = rms_history[-max_points:]
iterations = iterations[-max_points:]
# Determine status for text output
if rms > NOISE_THRESHOLD_HIGH: if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1 status = "🔴 ABOVE"
elif rms < NOISE_THRESHOLD_LOW: elif rms < NOISE_THRESHOLD_LOW:
CURRENT_TREND = -1 status = "🔵 BELOW"
else: else:
CURRENT_TREND = 0 status = "🟢 OK"
debug_print(f"Current trend: {CURRENT_TREND}", "info") # Plot the graph
plt.clf()
plt.plot(iterations, rms_history, marker="dot", color="cyan")
plt.horizontal_line(NOISE_THRESHOLD_HIGH, color="red")
plt.horizontal_line(NOISE_THRESHOLD_LOW, color="blue")
plt.title("Real-Time RMS (Line Graph)")
plt.xlabel("Iteration")
plt.ylabel("RMS")
plt.ylim(0, max(0.001, max(rms_history) * 1.2))
plt.show()
if CURRENT_TREND != 0: print(f"Current RMS: {rms:.6f}{status}")
if CURRENT_TREND == PREVIOUS_TREND: time.sleep(0.5)
TREND_COUNT += 1
else:
TREND_COUNT = 1
PREVIOUS_TREND = CURRENT_TREND
else:
TREND_COUNT = 0
debug_print(f"Trend count: {TREND_COUNT}", "info") # ---------------------- Dynamic Gain Control Loop ----------------------
current_gain_db = get_gain_db(MICROPHONE_NAME) def dynamic_gain_control():
debug_print("Starting dynamic gain controller...")
if current_gain_db is None: set_gain_db(MICROPHONE_NAME, (MIN_GAIN_DB + MAX_GAIN_DB) // 2)
print("Failed to get current gain level. Retrying in 1 minute...") while True:
time.sleep(60) audio = capture_audio(RTSP_URL)
if audio is None or len(audio) == 0:
debug_print("No audio captured; retrying...", "warning")
time.sleep(SLEEP_SECONDS)
continue continue
filtered_audio = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER)
debug_print(f"Current gain: {current_gain_db} dB", "info") rms = measure_rms(filtered_audio)
debug_print(f"Measured RMS: {rms:.6f}", "info")
# Output summary log for the current state current_gain = get_gain_db(MICROPHONE_NAME)
summary_log(current_gain_db, overload, rms, thd) if current_gain is None:
debug_print("Failed to read current gain; skipping cycle.", "warning")
# Adjust gain based on noise trend if threshold count is reached time.sleep(SLEEP_SECONDS)
if TREND_COUNT >= TREND_COUNT_THRESHOLD: continue
if CURRENT_TREND == 1 and int(current_gain_db) > MIN_GAIN_DB: if rms > NOISE_THRESHOLD_HIGH:
# Decrease gain by DECREASE_GAIN_STEP_DB dB debug_print(f"Signal too high: {rms:.6f} > {NOISE_THRESHOLD_HIGH:.7f}. Decreasing gain...", "info")
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) set_gain_db(MICROPHONE_NAME, current_gain - GAIN_STEP_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): elif rms < NOISE_THRESHOLD_LOW:
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB") debug_print(f"Signal too low: {rms:.6f} < {NOISE_THRESHOLD_LOW:.7f}. Increasing gain...", "info")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.", "info") set_gain_db(MICROPHONE_NAME, current_gain + GAIN_STEP_DB)
TREND_COUNT = 0
elif CURRENT_TREND == -1 and int(current_gain_db) < MAX_GAIN_DB:
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.", "info")
TREND_COUNT = 0
else: else:
debug_print("No gain adjustment needed based on noise trend.", "info") debug_print("RMS within acceptable range; no gain change.", "info")
time.sleep(SLEEP_SECONDS)
# Sleep for 1 minute before the next iteration # ---------------------- Main ----------------------
time.sleep(60)
def main():
args = parse_args()
if args.calibrate:
mic_params = interactive_calibration()
proposal = calibrate_and_propose(mic_params)
save = input("Save these values permanently into the script? [y/N]: ").strip().lower()
if save in ["y", "yes"]:
script_path = os.path.abspath(__file__)
persist_calibration_to_script(script_path, proposal)
print("👍 Calibration values saved. Exiting now.\n")
else:
print("❌ Not saving values. Exiting.\n")
sys.exit(0)
if args.test:
test_mode()
sys.exit(0)
# Normal operation: run dynamic gain control.
dynamic_gain_control()
if __name__ == "__main__": if __name__ == "__main__":
main() main()