Update DOCS.md

This commit is contained in:
Alexandre
2024-10-31 22:15:36 +01:00
committed by GitHub
parent c50f8d66e5
commit 9a6ddc450f

View File

@@ -408,7 +408,7 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
#!/usr/bin/env python3
"""
Microphone Gain Adjustment Script with THD and Overload Detection
def set_gain_db
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic
Distortion (THD) over the full frequency range, and adjusts the microphone gain based
@@ -420,6 +420,9 @@ Dependencies:
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-10-28 (Updated)
Changelog:
- 2024-10-27: Increased sampling rate to 48,000 Hz.
- 2024-10-27: Extended THD calculation over the full frequency range.
@@ -428,8 +431,8 @@ Changelog:
- 2024-10-27: Enhanced debug output with logging levels.
- 2024-10-28: Added summary log mode for simplified output.
- 2024-10-28: Removed gain stabilization delay for immediate gain adjustments.
- 2024-10-28: Max gain capped at 40 dB, suppressed amixer logging.
- 2024-10-28: Implemented sampling rate reduction, capture duration reduction, scipy.fft usage, lower filter order, and multiprocessing for efficiency.
- 2024-10-31: Max gain capped at 40 dB, suppressed `amixer` logging.
"""
import subprocess
@@ -489,17 +492,25 @@ FILTER_ORDER = 3 # Reduced from 5 to 3 for efficiency
# -----------------------------------------------------------------------
def debug_print(msg, level="info"):
"""
Prints debug messages with logging levels if DEBUG mode is enabled.
:param msg: The debug message to print.
:param level: Logging level - "info", "warning", "error".
"""
if DEBUG and not SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{current_time}] [{level.upper()}] {msg}")
def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
"""
Outputs a summary log with date, time, current gain, clipping status, background noise, and THD.
:param current_gain: Current microphone gain in dB.
:param clipping: Clipping status (yes/no).
:param rms_amplitude: Background noise RMS amplitude.
:param thd_percentage: THD in percentage.
"""
if SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@@ -507,49 +518,288 @@ def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
print(f"{current_time} | Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | "
f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%")
def get_gain_db(mic_name):
"""
Retrieves the current gain setting of the specified microphone using amixer.
Considers only the integer part of the gain.
"""
cmd = ['amixer', 'sget', mic_name]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
match = re.search(r'\[(-?\d+)', output) # Match only integer part
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match:
return int(match.group(1))
return float(match.group(1))
else:
return None
except subprocess.CalledProcessError:
return None
def set_gain_db(mic_name, gain_db):
"""
Sets the gain of the specified microphone using amixer.
Suppresses output by redirecting to /dev/null.
"""
if gain_db > MAX_GAIN_DB:
return False # Do not exceed max gain
if int(gain_db) > MAX_GAIN_DB:
return True # Do not exceed max gain
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try:
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.check_call(cmd, stderr=subprocess.STDOUT, stdout=subprocess.DEVNULL)
return True
except subprocess.CalledProcessError:
return False
def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000):
"""
Dynamically finds the fundamental frequency within a specified range.
"""
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0
search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0
max_peak_idx = np.argmax(properties['peak_heights'])
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]
debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info")
return fundamental_freq, fundamental_amplitude
def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
"""
fft_vals = rfft(audio)
fft_freqs = rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)
if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning")
return 0.0
harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info")
harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0
debug_print(f"THD Calculation: {thd:.2f}%", "info")
return thd
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
"""
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning")
return -np.inf
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20)
pressure = rms_amplitude / mic_sensitivity_linear
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB", "info")
return spl
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
"""
if spl >= mic_clipping_spl - 3:
debug_print("Microphone overload detected.", "warning")
return True
return False
def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate):
"""
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload.
"""
cmd = [
'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url,
'-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate),
'-ac', '1', '-t', str(AUDIO_CAPTURE_DURATION), '-'
]
retries = 3
for attempt in range(retries):
try:
debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error")
time.sleep(5)
continue
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug_print(f"Captured {len(audio)} samples from audio stream.", "info")
if len(audio) == 0:
debug_print("No audio data captured.", "warning")
time.sleep(5)
continue
filtered_audio = sosfilt(bandpass_sos, audio)
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
return rms_amplitude, thd_percentage, spl, overload
except Exception as e:
debug_print(f"Exception during audio processing: {e}", "error")
time.sleep(5) # Small delay before retrying
return None, None, None, False
def audio_capture_process(queue, rtsp_url, sos, sampling_rate):
"""
Separate process for capturing and processing audio.
"""
while True:
rms, thd, spl, overload = calculate_noise_rms_and_thd(rtsp_url, sos, sampling_rate)
queue.put((rms, thd, spl, overload))
time.sleep(60) # Wait for the next capture cycle
def main():
# Example usage
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
if current_gain_db < MAX_GAIN_DB:
new_gain_db = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, new_gain_db):
print(f"Gain increased to {new_gain_db} dB")
else:
print("Gain is already at maximum and will not be increased.")
"""
Main loop that continuously monitors background noise, detects clipping, calculates THD,
and adjusts microphone gain with multiprocessing for audio capture and processing.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
# Precompute bandpass filter coefficients with updated SAMPLING_RATE and lower FILTER_ORDER
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos')
# Set the microphone gain to the maximum gain at the start
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
if success:
print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.")
else:
print("Failed to retrieve current gain level.")
print("Failed to set microphone gain at start. Exiting.")
sys.exit(1)
# Initialize multiprocessing queue
manager = mp.Manager()
queue = manager.Queue()
# Start audio capture and processing in a separate process
p = mp.Process(target=audio_capture_process, args=(queue, RTSP_URL, sos, SAMPLING_RATE))
p.start()
debug_print("Started audio capture and processing process.", "info")
while True:
if not queue.empty():
rms, thd, spl, overload = queue.get()
else:
time.sleep(1)
continue
if rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
continue
# Adjust gain if overload detected
if overload:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
# Output summary log
summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd)
continue # Skip trend adjustment in case of clipping
# Handle THD if SPL is above threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
if thd > MAX_THD_PERCENTAGE:
debug_print(f"High THD detected: {thd:.2f}%", "warning")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
else:
debug_print("THD within acceptable limits.", "info")
else:
debug_print("SPL below THD calculation threshold. Skipping THD check.", "info")
# Determine the noise trend
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
elif rms < NOISE_THRESHOLD_LOW:
CURRENT_TREND = -1
else:
CURRENT_TREND = 0
debug_print(f"Current trend: {CURRENT_TREND}", "info")
if CURRENT_TREND != 0:
if CURRENT_TREND == PREVIOUS_TREND:
TREND_COUNT += 1
else:
TREND_COUNT = 1
PREVIOUS_TREND = CURRENT_TREND
else:
TREND_COUNT = 0
debug_print(f"Trend count: {TREND_COUNT}", "info")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is None:
print("Failed to get current gain level. Retrying in 1 minute...")
continue
# Output summary log for the current state
summary_log(current_gain_db, overload, rms, thd)
# Adjust gain based on noise trend if threshold count is reached
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
if CURRENT_TREND == 1:
# Decrease gain by DECREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
TREND_COUNT = 0
elif CURRENT_TREND == -1:
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
TREND_COUNT = 0
else:
debug_print("No gain adjustment needed based on noise trend.", "info")
# Sleep for 1 minute before the next iteration
time.sleep(60)
if __name__ == "__main__":
main()