Update DOCS.md

This commit is contained in:
Alexandre
2024-10-28 07:12:44 +01:00
committed by GitHub
parent 845810dccc
commit a040378363

View File

@@ -429,266 +429,41 @@ echo "Mono optimization applied. Only using primary input and balanced outputs."
Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
```python
#!/usr/bin/env python3
# Calculate RMS
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
"""
Microphone Gain Adjustment Script with Clipping and Overload Detection
# Calculate THD over the full range
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-8000 Hz frequency band, detects clipping, calculates Sound Pressure Level (SPL),
and adjusts the microphone gain based on predefined noise thresholds, trends, and overload metrics.
# Calculate SPL
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
Dependencies:
- numpy
- scipy
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
# Detect microphone overload
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
Author: alexbelgium
Date: 27-Oct-2024
return rms_amplitude, thd_percentage, spl, overload
Changelog:
-----------
2024-04-27: Initial version
- Implemented basic microphone gain adjustment based on RMS levels and Total Harmonic Distortion (THD) calculations.
- Introduced overload detection based on Sound Pressure Level (SPL).
except Exception as e:
debug_print(f"Exception during audio processing: {e}", "error")
time.sleep(5) # Small delay before retrying
2024-10-27: Updated for simplified noise and clipping detection
- Removed THD calculations, as natural bird harmonics affect the distortion metric.
- Introduced direct clipping detection by analyzing audio sample amplitudes.
- Refocused the gain adjustment criteria on RMS amplitude and SPL within the target band (2000-8000 Hz).
- Simplified main loop to focus on RMS, SPL, and clipping instead of THD.
- Added `detect_clipping` function to identify clipping events.
- Updated debug logging to enhance traceability and include SPL measurements.
- Adjusted trend detection logic for more responsive gain adjustment.
"""
import subprocess
import numpy as np
from scipy.signal import butter, sosfilt
import time
import re
# ---------------------------- Configuration ----------------------------
# Microphone Settings
MICROPHONE_NAME = "Line In 1 Gain" # Adjust to match your microphone's control name
MIN_GAIN_DB = 20 # Minimum gain in dB
MAX_GAIN_DB = 45 # Maximum gain in dB
DECREASE_GAIN_STEP_DB = 1 # Gain decrease step in dB
INCREASE_GAIN_STEP_DB = 5 # Gain increase step in dB
CLIPPING_REDUCTION_DB = 3 # Reduction in dB if clipping is detected
# Noise Thresholds
NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude
NOISE_THRESHOLD_LOW = 0.00035 # Lower threshold for noise RMS amplitude
# Trend Detection
TREND_COUNT_THRESHOLD = 3 # Number of consecutive trends needed to adjust gain
# RTSP Stream URL
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic" # Replace with your RTSP stream URL
# Debug Mode (1 for enabled, 0 for disabled)
DEBUG = 1
# Microphone Characteristics
MIC_SENSITIVITY_DB = -28 # dB (0 dB = 1V/Pa)
MIC_CLIPPING_SPL = 120 # dB SPL at 1 kHz
# Calibration Constants (These may need to be adjusted based on actual calibration)
REFERENCE_PRESSURE = 20e-6 # 20 µPa, standard reference for SPL
# -----------------------------------------------------------------------
def debug_print(msg):
"""
Prints debug messages if DEBUG mode is enabled.
:param msg: The debug message to print.
"""
if DEBUG:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{current_time}] [DEBUG] {msg}")
def get_gain_db(mic_name):
"""
Retrieves the current gain setting of the specified microphone using amixer.
:param mic_name: The name of the microphone control in amixer.
:return: The current gain in dB as a float, or None if retrieval fails.
"""
cmd = ['amixer', 'sget', mic_name]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
# Regex to find patterns like [30.00dB]
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match:
gain_db = float(match.group(1))
debug_print(f"Retrieved gain: {gain_db} dB")
return gain_db
else:
debug_print("No gain information found in amixer output.")
return None
except subprocess.CalledProcessError as e:
debug_print(f"amixer sget failed: {e}")
return None
def set_gain_db(mic_name, gain_db):
"""
Sets the gain of the specified microphone using amixer.
:param mic_name: The name of the microphone control in amixer.
:param gain_db: The desired gain in dB.
:return: True if the gain was set successfully, False otherwise.
"""
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try:
subprocess.check_call(cmd, stderr=subprocess.STDOUT)
debug_print(f"Set gain to: {gain_db} dB")
return True
except subprocess.CalledProcessError as e:
debug_print(f"amixer sset failed: {e}")
return False
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
:param audio: The audio signal as a numpy array.
:param mic_sensitivity_db: Microphone sensitivity in dB (0 dB = 1V/Pa).
:return: SPL in dB.
"""
# Calculate RMS amplitude
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.")
return -np.inf
# Convert RMS amplitude to voltage
# Assuming audio is normalized between -1 and 1
# Convert voltage to pressure (Pa)
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) # V/Pa
pressure = rms_amplitude / mic_sensitivity_linear # Pa
# Calculate SPL
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB")
return spl
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
:param spl: The calculated SPL.
:param mic_clipping_spl: The microphone's clipping SPL.
:return: True if overload is detected, False otherwise.
"""
if spl >= mic_clipping_spl - 3: # Consider overload if within 3 dB of clipping SPL
debug_print("Microphone overload detected.")
return True
return False
def detect_clipping(audio):
"""
Detects if clipping has occurred in the audio signal.
:param audio: The audio signal as a numpy array.
:return: True if clipping is detected, False otherwise.
"""
max_amplitude = np.max(np.abs(audio))
if max_amplitude >= 1.0:
debug_print("Clipping detected in audio signal.")
return True
return False
def calculate_noise_rms_and_spl(rtsp_url, bandpass_sos, sampling_rate):
"""
Captures audio from an RTSP stream, calculates RMS, SPL, and detects microphone overload.
:param rtsp_url: The RTSP stream URL.
:param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections).
:param sampling_rate: Sampling rate of the audio signal.
:return: Tuple containing the RMS amplitude, SPL value, overload status, and clipping status.
"""
cmd = [
'ffmpeg',
'-loglevel', 'error',
'-rtsp_transport', 'tcp',
'-i', rtsp_url,
'-vn',
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', str(sampling_rate),
'-ac', '1',
'-t', '5',
'-'
]
try:
debug_print(f"Starting audio capture from {rtsp_url}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug_print(f"ffmpeg failed with error: {stderr.decode()}")
return None, None, False, False
# Convert raw PCM data to numpy array
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug_print(f"Captured {len(audio)} samples from audio stream.")
if len(audio) == 0:
debug_print("No audio data captured.")
return None, None, False, False
# Detect clipping
clipping = detect_clipping(audio)
# Apply bandpass filter
filtered_audio = sosfilt(bandpass_sos, audio)
debug_print("Applied bandpass filter to audio data.")
# Calculate RMS
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
# Calculate SPL
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
# Detect microphone overload
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
return rms_amplitude, spl, overload, clipping
except Exception as e:
debug_print(f"Exception during audio processing: {e}")
return None, None, False, False
return None, None, None, False
def main():
"""
Main loop that continuously monitors background noise, detects clipping,
and adjusts microphone gain accordingly.
Main loop that continuously monitors background noise, detects clipping, calculates THD,
and adjusts microphone gain with stabilization delay and retry logic for RTSP stream resilience.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
last_gain_adjustment_time = time.time() - GAIN_STABILIZATION_DELAY # Initialize
# Precompute the bandpass filter coefficients
LOWCUT = 2000 # Lower frequency bound in Hz
HIGHCUT = 8000 # Upper frequency bound in Hz
FILTER_ORDER = 5 # Order of the Butterworth filter
SAMPLING_RATE = 32000 # Sampling rate in Hz
# Precompute bandpass filter coefficients with updated SAMPLING_RATE
LOWCUT = 2000
HIGHCUT = 8000
FILTER_ORDER = 5
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos')
debug_print("Precomputed Butterworth bandpass filter coefficients.")
# Set the microphone gain to the maximum gain at the start
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
@@ -699,35 +474,39 @@ def main():
return
while True:
rms, spl, overload, clipping = calculate_noise_rms_and_spl(RTSP_URL, sos, SAMPLING_RATE)
rms, thd, spl, overload = calculate_noise_rms_and_thd(RTSP_URL, sos, SAMPLING_RATE)
if rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
time.sleep(60)
continue
# Print the final RMS amplitude
print(f"RMS Amplitude: {rms:.6f}")
debug_print(f"Current background noise (RMS amplitude): {rms:.6f}")
debug_print(f"Calculated SPL: {spl:.2f} dB")
# Detect clipping and reduce gain if needed
if overload or clipping:
current_time = time.time()
# Adjust gain if overload detected and sufficient time has passed
if overload and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = current_gain_db - CLIPPING_REDUCTION_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Overload or clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to overload or clipping.")
else:
print("Failed to reduce gain due to overload or clipping.")
# Skip trend adjustment in case of overload or clipping
time.sleep(60)
continue
NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.", "warning")
last_gain_adjustment_time = current_time
# Handle THD if SPL is above threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
if thd > MAX_THD_PERCENTAGE:
debug_print(f"High THD detected: {thd:.2f}%", "warning")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.", "info")
last_gain_adjustment_time = current_time
else:
debug_print("THD within acceptable limits.", "info")
# Determine the noise trend
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
@@ -736,7 +515,7 @@ def main():
else:
CURRENT_TREND = 0
debug_print(f"Current trend: {CURRENT_TREND}")
debug_print(f"Current trend: {CURRENT_TREND}", "info")
if CURRENT_TREND != 0:
if CURRENT_TREND == PREVIOUS_TREND:
@@ -747,7 +526,7 @@ def main():
else:
TREND_COUNT = 0
debug_print(f"Trend count: {TREND_COUNT}")
debug_print(f"Trend count: {TREND_COUNT}", "info")
current_gain_db = get_gain_db(MICROPHONE_NAME)
@@ -756,34 +535,27 @@ def main():
time.sleep(60)
continue
debug_print(f"Current gain: {current_gain_db} dB")
debug_print(f"Current gain: {current_gain_db} dB", "info")
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
# Adjust gain based on noise trend if threshold count is reached and stabilization delay has passed
if TREND_COUNT >= TREND_COUNT_THRESHOLD and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY:
if CURRENT_TREND == 1:
# Decrease gain by DECREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = current_gain_db - DECREASE_GAIN_STEP_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.")
else:
print("Failed to decrease gain.")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.", "info")
last_gain_adjustment_time = current_time
elif CURRENT_TREND == -1:
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = current_gain_db + INCREASE_GAIN_STEP_DB
if NEW_GAIN_DB > MAX_GAIN_DB:
NEW_GAIN_DB = MAX_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.")
else:
print("Failed to increase gain.")
debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.", "info")
last_gain_adjustment_time = current_time
TREND_COUNT = 0
else:
debug_print("No gain adjustment needed based on noise trend.")
debug_print("No gain adjustment needed based on noise trend.", "info")
# Sleep for 1 minute before the next iteration
time.sleep(60)