Update DOCS.md

This commit is contained in:
Alexandre
2024-10-18 10:38:25 +02:00
committed by GitHub
parent 813449cf2f
commit 59dc250752

View File

@@ -393,26 +393,18 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
"""
Microphone Gain Adjustment Script
This script captures audio from an RTSP stream using GStreamer if available,
or falls back to ffmpeg if GStreamer is not installed. The script processes
the audio to calculate the RMS within the 2000-4000 Hz frequency band and
adjusts the microphone gain based on predefined noise thresholds and trends.
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-4000 Hz frequency band, detects clipping, and adjusts the microphone
gain based on predefined noise thresholds, trends, and clipping detection.
Dependencies:
- numpy
- scipy
- ffmpeg or GStreamer (installed and accessible in PATH)
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-04-27
Changelog:
- 2024-04-27: Initial version of the script that captures RTSP stream, calculates RMS, and adjusts gain.
- 2024-10-17: Added support for GStreamer as the preferred RTSP stream capture tool, with ffmpeg as a fallback.
Implemented a debug log system to provide detailed logs with timestamps.
Added trend-based microphone gain adjustment and noise RMS calculation.
Enhanced error handling for both GStreamer and ffmpeg.
"""
import subprocess
@@ -429,6 +421,7 @@ MIN_GAIN_DB = 20 # Minimum gain in dB
MAX_GAIN_DB = 45 # Maximum gain in dB
DECREASE_GAIN_STEP_DB = 1 # Gain decrease step in dB
INCREASE_GAIN_STEP_DB = 5 # Gain increase step in dB
CLIPPING_REDUCTION_DB = 3 # Reduction in dB if clipping is detected
# Noise Thresholds
NOISE_THRESHOLD_HIGH = 0.001 # Upper threshold for noise RMS amplitude
@@ -445,6 +438,7 @@ DEBUG = 1
# -----------------------------------------------------------------------
def debug(msg):
"""
Prints debug messages if DEBUG mode is enabled.
@@ -466,6 +460,7 @@ def get_gain_db(mic_name):
cmd = ['amixer', 'sget', mic_name]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
# Regex to find patterns like [30.00dB]
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match:
gain_db = float(match.group(1))
@@ -497,84 +492,29 @@ def set_gain_db(mic_name, gain_db):
return False
def check_gstreamer_available():
def detect_clipping(audio):
"""
Checks if GStreamer is available on the system.
Detects if clipping occurs in the audio signal.
:param audio: The audio signal as a numpy array.
:return: True if clipping is detected, False otherwise.
"""
try:
subprocess.check_call(['which', 'gst-launch-1.0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
debug("GStreamer is available.")
CLIPPING_THRESHOLD = 1.0 # Normalized PCM16 max value is ±1.0
if np.any(audio >= CLIPPING_THRESHOLD) or np.any(audio <= -CLIPPING_THRESHOLD):
debug("Clipping detected in audio signal.")
return True
except subprocess.CalledProcessError:
debug("GStreamer is not available.")
return False
return False
def calculate_noise_rms_gstream(rtsp_url, bandpass_sos, num_bins=5):
def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5):
"""
Captures audio using GStreamer from an RTSP stream, applies a bandpass filter,
divides the audio into segments, and calculates the RMS of the quietest segment.
Captures audio from an RTSP stream, applies a bandpass filter, divides the
audio into segments, and calculates the RMS of the quietest segment. Also detects clipping.
:param rtsp_url: The RTSP stream URL.
:param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections).
:param num_bins: Number of segments to divide the audio into.
:return: The RMS amplitude of the quietest segment as a float, or None on failure.
"""
cmd = [
'gst-launch-1.0',
'rtspsrc', f'location={rtsp_url}', '!',
'decodebin', '!', 'audioconvert', '!',
'audioresample', '!', 'audio/x-raw,rate=32000,channels=1',
'!', 'filesink', 'location=/dev/stdout'
]
try:
debug(f"Starting GStreamer audio capture from {rtsp_url}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug(f"GStreamer failed with error: {stderr.decode()}")
return None
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug(f"Captured {len(audio)} samples from GStreamer.")
if len(audio) == 0:
debug("No audio data captured.")
return None
filtered = sosfilt(bandpass_sos, audio)
debug("Applied bandpass filter to audio data.")
total_samples = len(filtered)
bin_size = total_samples // num_bins
if bin_size == 0:
debug("Bin size is 0; insufficient audio data.")
return 0.0
trimmed_filtered = filtered[:bin_size * num_bins]
segments = trimmed_filtered.reshape(num_bins, bin_size)
debug(f"Divided audio into {num_bins} bins of {bin_size} samples each.")
rms_values = np.sqrt(np.mean(segments ** 2, axis=1))
min_rms = rms_values.min()
debug(f"Minimum RMS value among segments: {min_rms}")
return min_rms
except Exception as e:
debug(f"Exception during noise RMS calculation with GStreamer: {e}")
return None
def calculate_noise_rms_ffmpeg(rtsp_url, bandpass_sos, num_bins=5):
"""
Captures audio using FFmpeg from an RTSP stream, applies a bandpass filter,
divides the audio into segments, and calculates the RMS of the quietest segment.
:param rtsp_url: The RTSP stream URL.
:param bandpass_sos: Precomputed bandpass filter coefficients (Second-Order Sections).
:param num_bins: Number of segments to divide the audio into.
:return: The RMS amplitude of the quietest segment as a float, or None on failure.
:return: Tuple containing the RMS amplitude of the quietest segment and a boolean indicating clipping.
"""
cmd = [
'ffmpeg',
@@ -591,68 +531,73 @@ def calculate_noise_rms_ffmpeg(rtsp_url, bandpass_sos, num_bins=5):
]
try:
debug(f"Starting FFmpeg audio capture from {rtsp_url}")
debug(f"Starting audio capture from {rtsp_url}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug(f"FFmpeg failed with error: {stderr.decode()}")
return None
debug(f"ffmpeg failed with error: {stderr.decode()}")
return None, False
# Convert raw PCM data to numpy array
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug(f"Captured {len(audio)} samples from FFmpeg.")
debug(f"Captured {len(audio)} samples from audio stream.")
if len(audio) == 0:
debug("No audio data captured.")
return None
return None, False
# Check for clipping
is_clipping = detect_clipping(audio)
# Apply bandpass filter
filtered = sosfilt(bandpass_sos, audio)
debug("Applied bandpass filter to audio data.")
# Divide into num_bins
total_samples = len(filtered)
bin_size = total_samples // num_bins
if bin_size == 0:
debug("Bin size is 0; insufficient audio data.")
return 0.0
return 0.0, is_clipping
trimmed_filtered = filtered[:bin_size * num_bins]
trimmed_length = bin_size * num_bins
trimmed_filtered = filtered[:trimmed_length]
segments = trimmed_filtered.reshape(num_bins, bin_size)
debug(f"Divided audio into {num_bins} bins of {bin_size} samples each.")
# Calculate RMS for each segment
rms_values = np.sqrt(np.mean(segments ** 2, axis=1))
debug(f"Calculated RMS values for each segment: {rms_values}")
# Return the minimum RMS value and clipping status
min_rms = rms_values.min()
debug(f"Minimum RMS value among segments: {min_rms}")
return min_rms
return min_rms, is_clipping
except Exception as e:
debug(f"Exception during noise RMS calculation with FFmpeg: {e}")
return None
def calculate_noise_rms(rtsp_url, bandpass_sos, num_bins=5):
"""
Attempts to capture audio from an RTSP stream using GStreamer first,
and falls back to FFmpeg if GStreamer is unavailable.
"""
if check_gstreamer_available():
return calculate_noise_rms_gstream(rtsp_url, bandpass_sos, num_bins)
else:
return calculate_noise_rms_ffmpeg(rtsp_url, bandpass_sos, num_bins)
debug(f"Exception during noise RMS calculation: {e}")
return None, False
def main():
"""
Main loop that continuously monitors background noise and adjusts microphone gain.
Main loop that continuously monitors background noise, detects clipping, and adjusts microphone gain.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
LOWCUT = 2000
HIGHCUT = 4000
FILTER_ORDER = 5
# Precompute the bandpass filter coefficients
LOWCUT = 2000 # Lower frequency bound in Hz
HIGHCUT = 8000 # Upper frequency bound in Hz
FILTER_ORDER = 5 # Order of the Butterworth filter
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=44100, output='sos')
debug("Precomputed Butterworth bandpass filter coefficients.")
# Set the microphone gain to the maximum gain at the start
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
if success:
print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.")
@@ -661,16 +606,40 @@ def main():
return
while True:
min_rms = calculate_noise_rms(RTSP_URL, sos, num_bins=5)
min_rms, is_clipping = calculate_noise_rms(RTSP_URL, sos, num_bins=5)
if min_rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
time.sleep(60)
continue
if not isinstance(min_rms, (float, int)):
print(f"Invalid noise RMS output detected: {min_rms}. Retrying in 1 minute...")
time.sleep(60)
continue
# Print the final converted RMS amplitude (only once)
print(f"Converted RMS Amplitude: {min_rms}")
debug(f"Current background noise (RMS amplitude): {min_rms}")
# Detect clipping and reduce gain if needed
CURRENT_GAIN_DB = get_gain_db(MICROPHONE_NAME)
if is_clipping:
NEW_GAIN_DB = CURRENT_GAIN_DB - CLIPPING_REDUCTION_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
success = set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB)
if success:
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.")
else:
print("Failed to reduce gain due to clipping.")
# Skip trend adjustment in case of clipping
time.sleep(60)
continue
# Determine the noise trend
if min_rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
elif min_rms < NOISE_THRESHOLD_LOW:
@@ -691,17 +660,9 @@ def main():
debug(f"Trend count: {TREND_COUNT}")
CURRENT_GAIN_DB = get_gain_db(MICROPHONE_NAME)
if CURRENT_GAIN_DB is None:
print("Failed to get current gain level. Retrying in 1 minute...")
time.sleep(60)
continue
debug(f"Current gain: {CURRENT_GAIN_DB} dB")
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
if CURRENT_TREND == 1:
# Decrease gain by 1 dB
NEW_GAIN_DB = CURRENT_GAIN_DB - DECREASE_GAIN_STEP_DB
if NEW_GAIN_DB < MIN_GAIN_DB:
NEW_GAIN_DB = MIN_GAIN_DB
@@ -712,6 +673,7 @@ def main():
else:
print("Failed to set new gain.")
elif CURRENT_TREND == -1:
# Increase gain by 5 dB
NEW_GAIN_DB = CURRENT_GAIN_DB + INCREASE_GAIN_STEP_DB
if NEW_GAIN_DB > MAX_GAIN_DB:
NEW_GAIN_DB = MAX_GAIN_DB
@@ -725,6 +687,7 @@ def main():
else:
debug("No gain adjustment needed.")
# Sleep for 1 minute before the next iteration
time.sleep(60)