From c50f8d66e58e3a30da7514781263f22dbf2c1c8e Mon Sep 17 00:00:00 2001 From: Alexandre <44178713+alexbelgium@users.noreply.github.com> Date: Thu, 31 Oct 2024 21:50:49 +0100 Subject: [PATCH] Update DOCS.md --- birdnet-pi/DOCS.md | 286 +++------------------------------------------ 1 file changed, 19 insertions(+), 267 deletions(-) diff --git a/birdnet-pi/DOCS.md b/birdnet-pi/DOCS.md index 992ab0e86..d9ba31d10 100644 --- a/birdnet-pi/DOCS.md +++ b/birdnet-pi/DOCS.md @@ -420,9 +420,6 @@ Dependencies: - ffmpeg (installed and accessible in PATH) - amixer (for microphone gain control) -Author: OpenAI ChatGPT -Date: 2024-10-28 (Updated) - Changelog: - 2024-10-27: Increased sampling rate to 48,000 Hz. - 2024-10-27: Extended THD calculation over the full frequency range. @@ -431,8 +428,8 @@ Changelog: - 2024-10-27: Enhanced debug output with logging levels. - 2024-10-28: Added summary log mode for simplified output. - 2024-10-28: Removed gain stabilization delay for immediate gain adjustments. -- 2024-10-28: Max gain capped at 40 dB, suppressed `amixer` logging. - 2024-10-28: Implemented sampling rate reduction, capture duration reduction, scipy.fft usage, lower filter order, and multiprocessing for efficiency. +- 2024-10-31: Max gain capped at 40 dB, suppressed `amixer` logging. """ import subprocess @@ -492,25 +489,17 @@ FILTER_ORDER = 3 # Reduced from 5 to 3 for efficiency # ----------------------------------------------------------------------- - def debug_print(msg, level="info"): """ Prints debug messages with logging levels if DEBUG mode is enabled. - :param msg: The debug message to print. - :param level: Logging level - "info", "warning", "error". """ if DEBUG and not SUMMARY_MODE: current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(f"[{current_time}] [{level.upper()}] {msg}") - def summary_log(current_gain, clipping, rms_amplitude, thd_percentage): """ Outputs a summary log with date, time, current gain, clipping status, background noise, and THD. - :param current_gain: Current microphone gain in dB. - :param clipping: Clipping status (yes/no). - :param rms_amplitude: Background noise RMS amplitude. - :param thd_percentage: THD in percentage. """ if SUMMARY_MODE: current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) @@ -518,286 +507,49 @@ def summary_log(current_gain, clipping, rms_amplitude, thd_percentage): print(f"{current_time} | Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | " f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%") - def get_gain_db(mic_name): """ Retrieves the current gain setting of the specified microphone using amixer. + Considers only the integer part of the gain. """ cmd = ['amixer', 'sget', mic_name] try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() - match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output) + match = re.search(r'\[(-?\d+)', output) # Match only integer part if match: - return float(match.group(1)) + return int(match.group(1)) else: return None except subprocess.CalledProcessError: return None - def set_gain_db(mic_name, gain_db): """ Sets the gain of the specified microphone using amixer. - Suppresses logging related to this action. + Suppresses output by redirecting to /dev/null. """ + if gain_db > MAX_GAIN_DB: + return False # Do not exceed max gain + cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB'] try: - subprocess.check_call(cmd, stderr=subprocess.STDOUT) + subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except subprocess.CalledProcessError: return False - -def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000): - """ - Dynamically finds the fundamental frequency within a specified range. - """ - idx_min = np.searchsorted(fft_freqs, min_freq) - idx_max = np.searchsorted(fft_freqs, max_freq) - if idx_max <= idx_min: - return None, 0 - - search_magnitude = fft_magnitude[idx_min:idx_max] - search_freqs = fft_freqs[idx_min:idx_max] - peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1) - if len(peaks) == 0: - return None, 0 - - max_peak_idx = np.argmax(properties['peak_heights']) - fundamental_freq = search_freqs[peaks[max_peak_idx]] - fundamental_amplitude = search_magnitude[peaks[max_peak_idx]] - - debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info") - return fundamental_freq, fundamental_amplitude - - -def thd_calculation(audio, sampling_rate, num_harmonics=5): - """ - Calculates Total Harmonic Distortion (THD) for the audio signal. - """ - fft_vals = rfft(audio) - fft_freqs = rfftfreq(len(audio), 1 / sampling_rate) - fft_magnitude = np.abs(fft_vals) - fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude) - - if fundamental_freq is None or fundamental_amplitude < 1e-6: - debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning") - return 0.0 - - harmonic_amplitudes = [] - for n in range(2, num_harmonics + 1): - harmonic_freq = n * fundamental_freq - if harmonic_freq > sampling_rate / 2: - break - harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq)) - harmonic_amp = fft_magnitude[harmonic_idx] - harmonic_amplitudes.append(harmonic_amp) - debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info") - - harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes))) - thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0 - debug_print(f"THD Calculation: {thd:.2f}%", "info") - return thd - - -def calculate_spl(audio, mic_sensitivity_db): - """ - Calculates the Sound Pressure Level (SPL) from the audio signal. - """ - rms_amplitude = np.sqrt(np.mean(audio ** 2)) - if rms_amplitude == 0: - debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning") - return -np.inf - - mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20) - pressure = rms_amplitude / mic_sensitivity_linear - spl = 20 * np.log10(pressure / REFERENCE_PRESSURE) - debug_print(f"Calculated SPL: {spl:.2f} dB", "info") - return spl - - -def detect_microphone_overload(spl, mic_clipping_spl): - """ - Detects if the calculated SPL is approaching the microphone's clipping SPL. - """ - if spl >= mic_clipping_spl - 3: - debug_print("Microphone overload detected.", "warning") - return True - return False - - -def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate): - """ - Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload. - """ - cmd = [ - 'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url, - '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate), - '-ac', '1', '-t', str(AUDIO_CAPTURE_DURATION), '-' - ] - - retries = 3 - for attempt in range(retries): - try: - debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info") - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - - if process.returncode != 0: - debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error") - time.sleep(5) - continue - - audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0 - debug_print(f"Captured {len(audio)} samples from audio stream.", "info") - if len(audio) == 0: - debug_print("No audio data captured.", "warning") - time.sleep(5) - continue - - filtered_audio = sosfilt(bandpass_sos, audio) - rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2)) - thd_percentage = thd_calculation(filtered_audio, sampling_rate) - spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB) - overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL) - - return rms_amplitude, thd_percentage, spl, overload - - except Exception as e: - debug_print(f"Exception during audio processing: {e}", "error") - time.sleep(5) # Small delay before retrying - - return None, None, None, False - - -def audio_capture_process(queue, rtsp_url, sos, sampling_rate): - """ - Separate process for capturing and processing audio. - """ - while True: - rms, thd, spl, overload = calculate_noise_rms_and_thd(rtsp_url, sos, sampling_rate) - queue.put((rms, thd, spl, overload)) - time.sleep(60) # Wait for the next capture cycle - - def main(): - """ - Main loop that continuously monitors background noise, detects clipping, calculates THD, - and adjusts microphone gain with multiprocessing for audio capture and processing. - """ - TREND_COUNT = 0 - PREVIOUS_TREND = 0 - - # Precompute bandpass filter coefficients with updated SAMPLING_RATE and lower FILTER_ORDER - sos = butter(FILTER_ORDER, [LOWCUT, HIGHCUT], btype='band', fs=SAMPLING_RATE, output='sos') - - # Set the microphone gain to the maximum gain at the start - success = set_gain_db(MICROPHONE_NAME, MAX_GAIN_DB) - if success: - print(f"Microphone gain set to {MAX_GAIN_DB} dB at start.") + # Example usage + current_gain_db = get_gain_db(MICROPHONE_NAME) + if current_gain_db is not None: + if current_gain_db < MAX_GAIN_DB: + new_gain_db = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB) + if set_gain_db(MICROPHONE_NAME, new_gain_db): + print(f"Gain increased to {new_gain_db} dB") + else: + print("Gain is already at maximum and will not be increased.") else: - print("Failed to set microphone gain at start. Exiting.") - sys.exit(1) - - # Initialize multiprocessing queue - manager = mp.Manager() - queue = manager.Queue() - - # Start audio capture and processing in a separate process - p = mp.Process(target=audio_capture_process, args=(queue, RTSP_URL, sos, SAMPLING_RATE)) - p.start() - debug_print("Started audio capture and processing process.", "info") - - while True: - if not queue.empty(): - rms, thd, spl, overload = queue.get() - else: - time.sleep(1) - continue - - if rms is None: - print("Failed to compute noise RMS. Retrying in 1 minute...") - continue - - # Adjust gain if overload detected - if overload: - current_gain_db = get_gain_db(MICROPHONE_NAME) - if current_gain_db is not None: - NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB") - current_gain_db = NEW_GAIN_DB # Update current gain - # Output summary log - summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd) - continue # Skip trend adjustment in case of clipping - - # Handle THD if SPL is above threshold - if spl >= THD_FUNDAMENTAL_THRESHOLD_DB: - if thd > MAX_THD_PERCENTAGE: - debug_print(f"High THD detected: {thd:.2f}%", "warning") - current_gain_db = get_gain_db(MICROPHONE_NAME) - if current_gain_db is not None: - NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB") - current_gain_db = NEW_GAIN_DB # Update current gain - else: - debug_print("THD within acceptable limits.", "info") - else: - debug_print("SPL below THD calculation threshold. Skipping THD check.", "info") - - # Determine the noise trend - if rms > NOISE_THRESHOLD_HIGH: - CURRENT_TREND = 1 - elif rms < NOISE_THRESHOLD_LOW: - CURRENT_TREND = -1 - else: - CURRENT_TREND = 0 - - debug_print(f"Current trend: {CURRENT_TREND}", "info") - - if CURRENT_TREND != 0: - if CURRENT_TREND == PREVIOUS_TREND: - TREND_COUNT += 1 - else: - TREND_COUNT = 1 - PREVIOUS_TREND = CURRENT_TREND - else: - TREND_COUNT = 0 - - debug_print(f"Trend count: {TREND_COUNT}", "info") - - current_gain_db = get_gain_db(MICROPHONE_NAME) - - if current_gain_db is None: - print("Failed to get current gain level. Retrying in 1 minute...") - continue - - # Output summary log for the current state - summary_log(current_gain_db, overload, rms, thd) - - # Adjust gain based on noise trend if threshold count is reached - if TREND_COUNT >= TREND_COUNT_THRESHOLD: - if CURRENT_TREND == 1: - # Decrease gain by DECREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB") - current_gain_db = NEW_GAIN_DB # Update current gain - TREND_COUNT = 0 - elif CURRENT_TREND == -1: - # Increase gain by INCREASE_GAIN_STEP_DB dB - NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB) - if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB): - print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB") - current_gain_db = NEW_GAIN_DB # Update current gain - TREND_COUNT = 0 - else: - debug_print("No gain adjustment needed based on noise trend.", "info") - - # Sleep for 1 minute before the next iteration - time.sleep(60) - + print("Failed to retrieve current gain level.") if __name__ == "__main__": main()