Update DOCS.md

This commit is contained in:
Alexandre
2024-10-31 21:50:49 +01:00
committed by GitHub
parent c9585859f3
commit c50f8d66e5

View File

@@ -420,9 +420,6 @@ Dependencies:
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-10-28 (Updated)
Changelog:
- 2024-10-27: Increased sampling rate to 48,000 Hz.
- 2024-10-27: Extended THD calculation over the full frequency range.
@@ -431,8 +428,8 @@ Changelog:
- 2024-10-27: Enhanced debug output with logging levels.
- 2024-10-28: Added summary log mode for simplified output.
- 2024-10-28: Removed gain stabilization delay for immediate gain adjustments.
- 2024-10-28: Max gain capped at 40 dB, suppressed `amixer` logging.
- 2024-10-28: Implemented sampling rate reduction, capture duration reduction, scipy.fft usage, lower filter order, and multiprocessing for efficiency.
- 2024-10-31: Max gain capped at 40 dB, suppressed `amixer` logging.
"""
import subprocess
@@ -492,25 +489,17 @@ FILTER_ORDER = 3 # Reduced from 5 to 3 for efficiency
# -----------------------------------------------------------------------
def debug_print(msg, level="info"):
"""
Prints debug messages with logging levels if DEBUG mode is enabled.
:param msg: The debug message to print.
:param level: Logging level - "info", "warning", "error".
"""
if DEBUG and not SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{current_time}] [{level.upper()}] {msg}")
def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
"""
Outputs a summary log with date, time, current gain, clipping status, background noise, and THD.
:param current_gain: Current microphone gain in dB.
:param clipping: Clipping status (yes/no).
:param rms_amplitude: Background noise RMS amplitude.
:param thd_percentage: THD in percentage.
"""
if SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@@ -518,286 +507,49 @@ def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
print(f"{current_time} | Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | "
f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%")
def get_gain_db(mic_name):
"""
Retrieves the current gain setting of the specified microphone using amixer.
Considers only the integer part of the gain.
"""
cmd = ['amixer', 'sget', mic_name]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
match = re.search(r'\[(-?\d+)', output) # Match only integer part
if match:
return float(match.group(1))
return int(match.group(1))
else:
return None
except subprocess.CalledProcessError:
return None
def set_gain_db(mic_name, gain_db):
"""
Sets the gain of the specified microphone using amixer.
Suppresses logging related to this action.
Suppresses output by redirecting to /dev/null.
"""
if gain_db > MAX_GAIN_DB:
return False # Do not exceed max gain
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try:
subprocess.check_call(cmd, stderr=subprocess.STDOUT)
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except subprocess.CalledProcessError:
return False
def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000):
"""
Dynamically finds the fundamental frequency within a specified range.
"""
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0
search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0
max_peak_idx = np.argmax(properties['peak_heights'])
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]
debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info")
return fundamental_freq, fundamental_amplitude
def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
"""
fft_vals = rfft(audio)
fft_freqs = rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)
if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning")
return 0.0
harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info")
harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0
debug_print(f"THD Calculation: {thd:.2f}%", "info")
return thd
def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
"""
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning")
return -np.inf
mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20)
pressure = rms_amplitude / mic_sensitivity_linear
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB", "info")
return spl
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
"""
if spl >= mic_clipping_spl - 3:
debug_print("Microphone overload detected.", "warning")
return True
return False
def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate):
"""
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload.
"""
cmd = [
'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url,
'-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate),
'-ac', '1', '-t', str(AUDIO_CAPTURE_DURATION), '-'
]
retries = 3
for attempt in range(retries):
try:
debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error")
time.sleep(5)
continue
audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug_print(f"Captured {len(audio)} samples from audio stream.", "info")
if len(audio) == 0:
debug_print("No audio data captured.", "warning")
time.sleep(5)
continue
filtered_audio = sosfilt(bandpass_sos, audio)
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)
return rms_amplitude, thd_percentage, spl, overload
except Exception as e:
debug_print(f"Exception during audio processing: {e}", "error")
time.sleep(5) # Small delay before retrying
return None, None, None, False
def audio_capture_process(queue, rtsp_url, sos, sampling_rate):
"""
Separate process for capturing and processing audio.
"""
while True:
rms, thd, spl, overload = calculate_noise_rms_and_thd(rtsp_url, sos, sampling_rate)
queue.put((rms, thd, spl, overload))
time.sleep(60) # Wait for the next capture cycle
def main():
"""
Main loop that continuously monitors background noise, detects clipping, calculates THD,
and adjusts microphone gain with multiprocessing for audio capture and processing.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
# Precompute bandpass filter coefficients with updated SAMPLING_RATE and lower FILTER_ORDER
sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos')
# Set the microphone gain to the maximum gain at the start
success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB)
if success:
print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.")
# Example usage
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
if current_gain_db < MAX_GAIN_DB:
new_gain_db = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, new_gain_db):
print(f"Gain increased to {new_gain_db} dB")
else:
print("Gain is already at maximum and will not be increased.")
else:
print("Failed to set microphone gain at start. Exiting.")
sys.exit(1)
# Initialize multiprocessing queue
manager = mp.Manager()
queue = manager.Queue()
# Start audio capture and processing in a separate process
p = mp.Process(target=audio_capture_process, args=(queue, RTSP_URL, sos, SAMPLING_RATE))
p.start()
debug_print("Started audio capture and processing process.", "info")
while True:
if not queue.empty():
rms, thd, spl, overload = queue.get()
else:
time.sleep(1)
continue
if rms is None:
print("Failed to compute noise RMS. Retrying in 1 minute...")
continue
# Adjust gain if overload detected
if overload:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
# Output summary log
summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd)
continue # Skip trend adjustment in case of clipping
# Handle THD if SPL is above threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
if thd > MAX_THD_PERCENTAGE:
debug_print(f"High THD detected: {thd:.2f}%", "warning")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
else:
debug_print("THD within acceptable limits.", "info")
else:
debug_print("SPL below THD calculation threshold. Skipping THD check.", "info")
# Determine the noise trend
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
elif rms < NOISE_THRESHOLD_LOW:
CURRENT_TREND = -1
else:
CURRENT_TREND = 0
debug_print(f"Current trend: {CURRENT_TREND}", "info")
if CURRENT_TREND != 0:
if CURRENT_TREND == PREVIOUS_TREND:
TREND_COUNT += 1
else:
TREND_COUNT = 1
PREVIOUS_TREND = CURRENT_TREND
else:
TREND_COUNT = 0
debug_print(f"Trend count: {TREND_COUNT}", "info")
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is None:
print("Failed to get current gain level. Retrying in 1 minute...")
continue
# Output summary log for the current state
summary_log(current_gain_db, overload, rms, thd)
# Adjust gain based on noise trend if threshold count is reached
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
if CURRENT_TREND == 1:
# Decrease gain by DECREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
TREND_COUNT = 0
elif CURRENT_TREND == -1:
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
current_gain_db = NEW_GAIN_DB # Update current gain
TREND_COUNT = 0
else:
debug_print("No gain adjustment needed based on noise trend.", "info")
# Sleep for 1 minute before the next iteration
time.sleep(60)
print("Failed to retrieve current gain level.")
if __name__ == "__main__":
main()