mirror of
https://github.com/alexbelgium/hassio-addons.git
synced 2026-05-23 17:21:56 +02:00
Update DOCS.md
This commit is contained in:
@@ -421,7 +421,7 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Dynamic Microphone Gain Adjustment Script with Interactive Calibration,
|
Dynamic Microphone Gain Adjustment Script with Interactive Calibration,
|
||||||
Self Modification, and a Test Mode for Real Time RMS Line Graph using plotext
|
Self‑Modification, No‑Signal Reboot Logic, and a Test Mode for Real‑Time RMS Line Graph using plotext
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
./autogain.py -> Normal dynamic gain control
|
./autogain.py -> Normal dynamic gain control
|
||||||
@@ -438,68 +438,99 @@ import re
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
# ---------------------- Default Configuration ----------------------
|
||||||
|
|
||||||
MICROPHONE_NAME = "Line In 1 Gain"
|
MICROPHONE_NAME = "Line In 1 Gain"
|
||||||
MIN_GAIN_DB = 30
|
MIN_GAIN_DB = 30
|
||||||
MAX_GAIN_DB = 38
|
MAX_GAIN_DB = 38
|
||||||
GAIN_STEP_DB = 3
|
GAIN_STEP_DB = 3
|
||||||
|
|
||||||
|
# RMS thresholds
|
||||||
NOISE_THRESHOLD_HIGH = 0.01
|
NOISE_THRESHOLD_HIGH = 0.01
|
||||||
NOISE_THRESHOLD_LOW = 0.001
|
NOISE_THRESHOLD_LOW = 0.001
|
||||||
|
|
||||||
SAMPLING_RATE = 48000
|
# No-signal detection
|
||||||
LOWCUT = 2000
|
NO_SIGNAL_THRESHOLD = 1e-6
|
||||||
HIGHCUT = 8000
|
NO_SIGNAL_COUNT_THRESHOLD = 3
|
||||||
FILTER_ORDER = 4
|
|
||||||
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic"
|
SAMPLING_RATE = 48000 # 48 kHz
|
||||||
|
LOWCUT = 2000
|
||||||
|
HIGHCUT = 8000
|
||||||
|
FILTER_ORDER = 4
|
||||||
|
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic"
|
||||||
SLEEP_SECONDS = 10
|
SLEEP_SECONDS = 10
|
||||||
|
|
||||||
REFERENCE_PRESSURE = 20e-6
|
REFERENCE_PRESSURE = 20e-6 # 20 µPa
|
||||||
DEFAULT_SNR = 80.0
|
|
||||||
DEFAULT_SELF_NOISE = 14.0
|
|
||||||
DEFAULT_CLIPPING = 120.0
|
|
||||||
DEFAULT_SENSITIVITY = -28.0
|
|
||||||
|
|
||||||
NO_SIGNAL_THRESHOLD = 0.00001
|
# Default microphone specifications (for calibration reference)
|
||||||
NO_SIGNAL_COUNT_MAX = 3
|
DEFAULT_SNR = 80.0 # dB
|
||||||
NO_SIGNAL_ACTION = "scarlett2 reboot && sudo reboot"
|
DEFAULT_SELF_NOISE = 14.0 # dB-A
|
||||||
|
DEFAULT_CLIPPING = 120.0 # dB SPL
|
||||||
|
DEFAULT_SENSITIVITY = -28.0 # dB re 1 V/Pa
|
||||||
|
|
||||||
def_full_scale = (REFERENCE_PRESSURE *
|
# Compute the default full-scale amplitude (used to derive default fractions)
|
||||||
10 ** (DEFAULT_CLIPPING / 20) *
|
def_full_scale = (
|
||||||
10 ** (DEFAULT_SENSITIVITY / 20))
|
REFERENCE_PRESSURE *
|
||||||
|
10 ** (DEFAULT_CLIPPING / 20) *
|
||||||
|
10 ** (DEFAULT_SENSITIVITY / 20)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------------- Argument Parsing ----------------------
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser(
|
||||||
parser.add_argument("--calibrate", action="store_true")
|
description="Dynamic Mic Gain Adjustment with calibration, test mode, self‑modification, and reboot logic."
|
||||||
parser.add_argument("--test", action="store_true")
|
)
|
||||||
|
parser.add_argument("--calibrate", action="store_true", help="Run interactive calibration mode")
|
||||||
|
parser.add_argument("--test", action="store_true", help="Run test mode to display a real‑time RMS graph using plotext")
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
# ---------------------- Audio & Gain Helpers ----------------------
|
||||||
|
|
||||||
def debug_print(msg, level="info"):
|
def debug_print(msg, level="info"):
|
||||||
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{level.upper()}] {msg}")
|
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||||
|
print(f"[{current_time}] [{level.upper()}] {msg}")
|
||||||
|
|
||||||
def get_gain_db(mic_name):
|
def get_gain_db(mic_name):
|
||||||
try:
|
try:
|
||||||
output = subprocess.check_output(['amixer', 'sget', mic_name], stderr=subprocess.STDOUT).decode()
|
output = subprocess.check_output(
|
||||||
|
['amixer', 'sget', mic_name], stderr=subprocess.STDOUT
|
||||||
|
).decode()
|
||||||
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
|
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
|
||||||
return float(match.group(1)) if match else None
|
if match:
|
||||||
except Exception:
|
return float(match.group(1))
|
||||||
return None
|
except subprocess.CalledProcessError as e:
|
||||||
|
debug_print(f"amixer sget failed: {e}", "error")
|
||||||
|
return None
|
||||||
|
|
||||||
def set_gain_db(mic_name, gain_db):
|
def set_gain_db(mic_name, gain_db):
|
||||||
gain_db = max(min(gain_db, MAX_GAIN_DB), MIN_GAIN_DB)
|
gain_db = max(min(gain_db, MAX_GAIN_DB), MIN_GAIN_DB)
|
||||||
try:
|
try:
|
||||||
subprocess.check_call(['amixer', 'sset', mic_name, f'{int(gain_db)}dB'], stdout=subprocess.DEVNULL)
|
subprocess.check_call(
|
||||||
|
['amixer', 'sset', mic_name, f'{int(gain_db)}dB'],
|
||||||
|
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
|
||||||
|
)
|
||||||
|
debug_print(f"Gain set to: {gain_db} dB", "info")
|
||||||
return True
|
return True
|
||||||
except Exception:
|
except subprocess.CalledProcessError as e:
|
||||||
return False
|
debug_print(f"Failed to set gain: {e}", "error")
|
||||||
|
return False
|
||||||
|
|
||||||
def capture_audio(rtsp_url, duration=5):
|
def capture_audio(rtsp_url, duration=5):
|
||||||
cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url,
|
cmd = [
|
||||||
'-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(SAMPLING_RATE), '-ac', '1', '-t', str(duration), '-']
|
'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp',
|
||||||
|
'-i', rtsp_url, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le',
|
||||||
|
'-ar', str(SAMPLING_RATE), '-ac', '1', '-t', str(duration), '-'
|
||||||
|
]
|
||||||
try:
|
try:
|
||||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
stdout, _ = process.communicate()
|
stdout, stderr = process.communicate()
|
||||||
|
if process.returncode != 0:
|
||||||
|
debug_print(f"ffmpeg failed: {stderr.decode().strip()}", "error")
|
||||||
|
return None
|
||||||
return np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
|
return np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
debug_print(f"Audio capture exception: {e}", "error")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def bandpass_filter(audio, lowcut, highcut, fs, order=4):
|
def bandpass_filter(audio, lowcut, highcut, fs, order=4):
|
||||||
@@ -509,25 +540,109 @@ def bandpass_filter(audio, lowcut, highcut, fs, order=4):
|
|||||||
def measure_rms(audio):
|
def measure_rms(audio):
|
||||||
return float(np.sqrt(np.mean(audio**2))) if len(audio) > 0 else 0.0
|
return float(np.sqrt(np.mean(audio**2))) if len(audio) > 0 else 0.0
|
||||||
|
|
||||||
|
# ---------------------- Interactive Calibration ----------------------
|
||||||
|
|
||||||
|
def prompt_float(prompt_str, default_val):
|
||||||
|
while True:
|
||||||
|
user_input = input(f"{prompt_str} [{default_val}]: ").strip()
|
||||||
|
if user_input == "":
|
||||||
|
return default_val
|
||||||
|
try:
|
||||||
|
return float(user_input)
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input; please enter a numeric value.")
|
||||||
|
|
||||||
|
def interactive_calibration():
|
||||||
|
print("\n-- INTERACTIVE CALIBRATION --")
|
||||||
|
print("Enter the microphone characteristics (press Enter to accept default):\n")
|
||||||
|
snr = prompt_float("1) Signal-to-Noise Ratio (dB)", DEFAULT_SNR)
|
||||||
|
self_noise = prompt_float("2) Self Noise (dB-A)", DEFAULT_SELF_NOISE)
|
||||||
|
clipping = prompt_float("3) Clipping SPL (dB)", DEFAULT_CLIPPING)
|
||||||
|
sensitivity = prompt_float("4) Sensitivity (dB re 1 V/Pa)", DEFAULT_SENSITIVITY)
|
||||||
|
return {"snr": snr, "self_noise": self_noise, "clipping": clipping, "sensitivity": sensitivity}
|
||||||
|
|
||||||
|
def calibrate_and_propose(mic_params):
|
||||||
|
user_snr = mic_params["snr"]
|
||||||
|
clipping = mic_params["clipping"]
|
||||||
|
sensitivity = mic_params["sensitivity"]
|
||||||
|
|
||||||
|
user_full_scale = (
|
||||||
|
REFERENCE_PRESSURE *
|
||||||
|
10 ** (clipping / 20) *
|
||||||
|
10 ** (sensitivity / 20)
|
||||||
|
)
|
||||||
|
fraction_high_default = NOISE_THRESHOLD_HIGH / def_full_scale
|
||||||
|
fraction_low_default = NOISE_THRESHOLD_LOW / def_full_scale
|
||||||
|
snr_ratio = user_snr / DEFAULT_SNR
|
||||||
|
|
||||||
|
proposed_high = fraction_high_default * user_full_scale * snr_ratio
|
||||||
|
proposed_low = fraction_low_default * user_full_scale * snr_ratio
|
||||||
|
gain_offset = (DEFAULT_SENSITIVITY - sensitivity)
|
||||||
|
proposed_min_gain = MIN_GAIN_DB + gain_offset
|
||||||
|
proposed_max_gain = MAX_GAIN_DB + gain_offset
|
||||||
|
|
||||||
|
print("\n===============================================================")
|
||||||
|
print("CURRENT VALUES:")
|
||||||
|
print("---------------------------------------------------------------")
|
||||||
|
print(f" NOISE_THRESHOLD_HIGH: {NOISE_THRESHOLD_HIGH:.7f}")
|
||||||
|
print(f" NOISE_THRESHOLD_LOW: {NOISE_THRESHOLD_LOW:.7f}")
|
||||||
|
print(f" MIN_GAIN_DB: {MIN_GAIN_DB}")
|
||||||
|
print(f" MAX_GAIN_DB: {MAX_GAIN_DB}")
|
||||||
|
print("---------------------------------------------------------------\n")
|
||||||
|
print("PROPOSED VALUES:")
|
||||||
|
print("---------------------------------------------------------------")
|
||||||
|
print(f" Proposed NOISE_THRESHOLD_HIGH: {proposed_high:.7f}")
|
||||||
|
print(f" Proposed NOISE_THRESHOLD_LOW: {proposed_low:.7f}\n")
|
||||||
|
print(" Proposed Gain Range (dB):")
|
||||||
|
print(f" MIN_GAIN_DB: {proposed_min_gain:.2f}")
|
||||||
|
print(f" MAX_GAIN_DB: {proposed_max_gain:.2f}")
|
||||||
|
print("---------------------------------------------------------------\n")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"noise_threshold_high": proposed_high,
|
||||||
|
"noise_threshold_low": proposed_low,
|
||||||
|
"min_gain_db": proposed_min_gain,
|
||||||
|
"max_gain_db": proposed_max_gain,
|
||||||
|
}
|
||||||
|
|
||||||
|
def persist_calibration_to_script(script_path, proposal):
|
||||||
|
subs = {
|
||||||
|
"NOISE_THRESHOLD_HIGH": f"{proposal['noise_threshold_high']:.7f}",
|
||||||
|
"NOISE_THRESHOLD_LOW": f"{proposal['noise_threshold_low']:.7f}",
|
||||||
|
"MIN_GAIN_DB": f"{int(round(proposal['min_gain_db']))}",
|
||||||
|
"MAX_GAIN_DB": f"{int(round(proposal['max_gain_db']))}"
|
||||||
|
}
|
||||||
|
for var, val in subs.items():
|
||||||
|
cmd = f"sed -i 's|^{var} = .*|{var} = {val}|' \"{script_path}\""
|
||||||
|
os.system(cmd)
|
||||||
|
print("✅ Script has been updated with the new calibration values.\n")
|
||||||
|
|
||||||
|
# ---------------------- Test Mode: Real-Time RMS Graph using plotext ----------------------
|
||||||
|
|
||||||
def test_mode():
|
def test_mode():
|
||||||
try:
|
try:
|
||||||
import plotext as plt
|
import plotext as plt
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("plotext is required. Install it using: pip install plotext")
|
print("plotext is required for test mode. Please install it using 'pip install plotext'.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
print("\n-- TEST MODE: Real-Time RMS Line Graph (plotext) --")
|
print("\n-- TEST MODE: Real-Time RMS Line Graph (plotext) --")
|
||||||
rms_history, iterations = [], []
|
print("Recording 5-second samples in a loop. Press Ctrl+C to exit.\n")
|
||||||
max_points, i = 20, 0
|
rms_history = []
|
||||||
|
iterations = []
|
||||||
|
max_points = 20
|
||||||
|
i = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
audio = capture_audio(RTSP_URL, duration=5)
|
audio = capture_audio(RTSP_URL, duration=5)
|
||||||
if audio is None:
|
if audio is None or len(audio) == 0:
|
||||||
print("No audio captured, retrying...")
|
print("No audio captured, retrying...")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
rms = measure_rms(bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE))
|
filtered = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER)
|
||||||
|
rms = measure_rms(filtered)
|
||||||
|
|
||||||
rms_history.append(rms)
|
rms_history.append(rms)
|
||||||
iterations.append(i)
|
iterations.append(i)
|
||||||
i += 1
|
i += 1
|
||||||
@@ -556,47 +671,69 @@ def test_mode():
|
|||||||
print(f"Current RMS: {rms:.6f} — {status}")
|
print(f"Current RMS: {rms:.6f} — {status}")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# ---------------------- Dynamic Gain Control Loop ----------------------
|
||||||
|
|
||||||
def dynamic_gain_control():
|
def dynamic_gain_control():
|
||||||
debug_print("Starting dynamic gain control...")
|
debug_print("Starting dynamic gain controller...")
|
||||||
set_gain_db(MICROPHONE_NAME, (MIN_GAIN_DB + MAX_GAIN_DB) // 2)
|
set_gain_db(MICROPHONE_NAME, (MIN_GAIN_DB + MAX_GAIN_DB) // 2)
|
||||||
|
|
||||||
no_signal_count = 0
|
no_signal_count = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
audio = capture_audio(RTSP_URL)
|
audio = capture_audio(RTSP_URL)
|
||||||
if audio is None:
|
if audio is None or len(audio) == 0:
|
||||||
|
debug_print("No audio captured; retrying...", "warning")
|
||||||
time.sleep(SLEEP_SECONDS)
|
time.sleep(SLEEP_SECONDS)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
rms = measure_rms(bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE))
|
filtered = bandpass_filter(audio, LOWCUT, HIGHCUT, SAMPLING_RATE, FILTER_ORDER)
|
||||||
gain = get_gain_db(MICROPHONE_NAME)
|
rms = measure_rms(filtered)
|
||||||
if gain is None:
|
debug_print(f"Measured RMS: {rms:.6f}", "info")
|
||||||
time.sleep(SLEEP_SECONDS)
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
# No-signal detection
|
||||||
if rms < NO_SIGNAL_THRESHOLD:
|
if rms < NO_SIGNAL_THRESHOLD:
|
||||||
no_signal_count += 1
|
no_signal_count += 1
|
||||||
debug_print(f"No signal detected (RMS: {rms:.7f}) — count = {no_signal_count}", "warning")
|
debug_print(f"No signal detected ({no_signal_count}/{NO_SIGNAL_COUNT_THRESHOLD})", "warning")
|
||||||
if no_signal_count >= NO_SIGNAL_COUNT_MAX:
|
if no_signal_count >= NO_SIGNAL_COUNT_THRESHOLD:
|
||||||
debug_print("!! NO AUDIO SIGNAL — executing recovery action", "error")
|
debug_print("No signal for too long, rebooting Scarlett + system...", "error")
|
||||||
os.system(NO_SIGNAL_ACTION)
|
subprocess.call("scarlett2 reboot && sudo reboot", shell=True)
|
||||||
no_signal_count = 0
|
|
||||||
else:
|
else:
|
||||||
no_signal_count = 0
|
no_signal_count = 0
|
||||||
|
|
||||||
if rms > NOISE_THRESHOLD_HIGH:
|
current_gain = get_gain_db(MICROPHONE_NAME)
|
||||||
set_gain_db(MICROPHONE_NAME, gain - GAIN_STEP_DB)
|
if current_gain is None:
|
||||||
elif rms < NOISE_THRESHOLD_LOW:
|
debug_print("Failed to read current gain; skipping cycle.", "warning")
|
||||||
set_gain_db(MICROPHONE_NAME, gain + GAIN_STEP_DB)
|
time.sleep(SLEEP_SECONDS)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if rms > NOISE_THRESHOLD_HIGH:
|
||||||
|
set_gain_db(MICROPHONE_NAME, current_gain - GAIN_STEP_DB)
|
||||||
|
elif rms < NOISE_THRESHOLD_LOW:
|
||||||
|
set_gain_db(MICROPHONE_NAME, current_gain + GAIN_STEP_DB)
|
||||||
|
|
||||||
debug_print(f"RMS: {rms:.6f} | Gain: {gain} dB")
|
|
||||||
time.sleep(SLEEP_SECONDS)
|
time.sleep(SLEEP_SECONDS)
|
||||||
|
|
||||||
|
# ---------------------- Main ----------------------
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
|
|
||||||
|
if args.calibrate:
|
||||||
|
mic_params = interactive_calibration()
|
||||||
|
proposal = calibrate_and_propose(mic_params)
|
||||||
|
save = input("Save these values permanently into the script? [y/N]: ").strip().lower()
|
||||||
|
if save in ["y", "yes"]:
|
||||||
|
persist_calibration_to_script(os.path.abspath(__file__), proposal)
|
||||||
|
print("👍 Calibration values saved. Exiting now.\n")
|
||||||
|
else:
|
||||||
|
print("❌ Not saving values. Exiting.\n")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
if args.test:
|
if args.test:
|
||||||
test_mode()
|
test_mode()
|
||||||
else:
|
sys.exit(0)
|
||||||
dynamic_gain_control()
|
|
||||||
|
dynamic_gain_control()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user