478 lines
19 KiB
Python
478 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Redisplay: Redilysis Audio Analysis UI
|
|
v0.2
|
|
|
|
This script provides a real-time audio visualization tool featuring a spectrum
|
|
analyzer, a waveform display, and multi-band VU meters. It captures audio from a
|
|
selected input device, processes it, and displays the analysis in a user-friendly
|
|
GUI built with Tkinter.
|
|
|
|
Key Features:
|
|
- Live waveform and spectrum plotting using Matplotlib.
|
|
- Configurable audio parameters (sample rate, window size).
|
|
- Variable display refresh rate.
|
|
- Device selection from available system audio inputs.
|
|
- VU meters for different frequency bands.
|
|
- Pushes analysis data to Redis, including spectrum, waveform, RMS, and frequency bins.
|
|
- Dark theme for the GUI using sv_ttk.
|
|
|
|
Redis Keys:
|
|
- 'Db': The full magnitude spectrum data (NumPy array, serialized to bytes).
|
|
- 'wave': The raw audio waveform data (NumPy array, serialized to bytes).
|
|
- 'spectrum_10': A string representation of a list of levels for the VU meter frequency bins (0-10).
|
|
- 'rms': The Root Mean Square of the raw audio signal (string).
|
|
|
|
Author: Sam
|
|
Date: 2025-07-15
|
|
License: to be defined
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
import tkinter.font as tkfont
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
from matplotlib.animation import FuncAnimation
|
|
import queue
|
|
import sv_ttk
|
|
import sys
|
|
import redis
|
|
from scipy.fft import rfft, rfftfreq
|
|
|
|
refresh_rate = 25
|
|
|
|
class VUMeter(tk.Canvas):
|
|
def __init__(self, parent, *args, **kwargs):
|
|
super().__init__(parent, *args, **kwargs)
|
|
self.config(width=40, bg='#282828', highlightthickness=0)
|
|
self.level = 0
|
|
self.bind("<Configure>", self._draw_meter)
|
|
|
|
def _draw_meter(self, event=None):
|
|
self.delete("all")
|
|
width = self.winfo_width()
|
|
height = self.winfo_height()
|
|
num_segments = 20
|
|
segment_height = (height - 4) / num_segments
|
|
level_segment = int(self.level * num_segments)
|
|
|
|
for i in range(num_segments):
|
|
y0 = height - (i + 1) * segment_height
|
|
y1 = height - i * segment_height - 2
|
|
color = self._get_color(i, num_segments)
|
|
if i < level_segment:
|
|
self.create_rectangle(2, y0, width - 2, y1, fill=color, outline="")
|
|
|
|
def _get_color(self, index, total):
|
|
if index > total * 0.85:
|
|
return "#ff0000"
|
|
elif index > total * 0.7:
|
|
return "#ff4500"
|
|
elif index > total * 0.5:
|
|
return "#ffa500"
|
|
else:
|
|
return "#00dd00"
|
|
|
|
def set_level(self, level):
|
|
self.level = max(0.0, min(1.0, level))
|
|
self._draw_meter()
|
|
|
|
class AudioPlotterApp(tk.Tk):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.title("Redilysis")
|
|
self.geometry("1000x700")
|
|
sv_ttk.set_theme("dark")
|
|
|
|
self.stream = None
|
|
self.animation = None
|
|
self.q = queue.Queue()
|
|
self.vu_meters = []
|
|
self.samplerate = 44100 # Default, will be overwritten
|
|
self.spectrum_line = None
|
|
self.waveform_line = None
|
|
self.xf = None
|
|
|
|
# Define small font style for compact controls
|
|
self.small_font = tkfont.Font(size=9)
|
|
style = ttk.Style(self)
|
|
style.configure('Small.TLabel', font=self.small_font)
|
|
style.configure('Small.TEntry', font=self.small_font)
|
|
style.configure('Small.TButton', font=self.small_font)
|
|
style.configure('Small.TCombobox', font=self.small_font)
|
|
style.configure('Small.TLabelframe.Label', font=self.small_font)
|
|
|
|
# Apply small font to default ttk widget families in the app
|
|
self.option_add("*TLabel.Font", self.small_font)
|
|
self.option_add("*TEntry.Font", self.small_font)
|
|
self.option_add("*TButton.Font", self.small_font)
|
|
self.option_add("*TCombobox*Font", self.small_font)
|
|
self.option_add("*Labelframe.LabelFont", self.small_font)
|
|
|
|
self._create_widgets()
|
|
self._force_focus()
|
|
|
|
def _force_focus(self):
|
|
"""Force the window to the front, specifically for macOS."""
|
|
if sys.platform != "darwin":
|
|
return
|
|
self.lift()
|
|
self.attributes('-topmost', True)
|
|
self.after_idle(self.attributes, '-topmost', False)
|
|
self.focus_force()
|
|
|
|
def _create_widgets(self):
|
|
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
|
|
main_pane.pack(fill=tk.BOTH, expand=True)
|
|
|
|
top_pane = ttk.PanedWindow(main_pane, orient=tk.HORIZONTAL)
|
|
main_pane.add(top_pane, weight=1)
|
|
|
|
controls_frame = ttk.Frame(top_pane, padding="10")
|
|
top_pane.add(controls_frame, weight=0)
|
|
|
|
vu_meter_frame = ttk.Frame(top_pane, padding="5")
|
|
top_pane.add(vu_meter_frame, weight=1)
|
|
|
|
self.plot_frame = ttk.Frame(main_pane)
|
|
main_pane.add(self.plot_frame, weight=4)
|
|
|
|
self._create_controls(controls_frame)
|
|
self._create_vu_meters_area(vu_meter_frame)
|
|
self._create_plot_area()
|
|
|
|
def _create_controls(self, parent):
|
|
parent.columnconfigure(0, weight=1)
|
|
current_row = 0
|
|
|
|
# --- Top frame for parameters (2 columns) ---
|
|
top_params_frame = ttk.Frame(parent)
|
|
top_params_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
|
|
top_params_frame.columnconfigure((0, 1), weight=1)
|
|
current_row += 1
|
|
|
|
# Left column: Audio settings
|
|
audio_params_frame = ttk.LabelFrame(top_params_frame, text="Audio Parameters")
|
|
audio_params_frame.grid(row=0, column=0, padx=(0, 5), sticky="nsew")
|
|
audio_params_frame.columnconfigure(1, weight=1)
|
|
|
|
self.settings = {}
|
|
settings_map = {
|
|
'window_size': ('Window Size', 1024),
|
|
'downsample': ('Downsample', 1),
|
|
'vu_bands': ('VU Bands', 12),
|
|
'samplerate': ('Sample Rate', '44100'),
|
|
}
|
|
for i, (key, (text, default)) in enumerate(settings_map.items()):
|
|
ttk.Label(audio_params_frame, text=text, style='Small.TLabel').grid(row=i, column=0, padx=5, pady=1, sticky="w")
|
|
var = tk.StringVar(value=str(default))
|
|
entry = ttk.Entry(audio_params_frame, textvariable=var, style='Small.TEntry')
|
|
entry.grid(row=i, column=1, padx=5, pady=2, sticky="ew")
|
|
self.settings[key] = var
|
|
|
|
# Right column: Redis settings (visual only)
|
|
redis_params_frame = ttk.LabelFrame(top_params_frame, text="Redis Connection")
|
|
redis_params_frame.grid(row=0, column=1, padx=(5, 0), sticky="nsew")
|
|
redis_params_frame.columnconfigure(1, weight=1)
|
|
|
|
# Default Redis connection parameters
|
|
self.ip_var = tk.StringVar(value="127.0.0.1")
|
|
self.port_var = tk.StringVar(value="6379")
|
|
|
|
ttk.Label(redis_params_frame, text="IP Address:", style='Small.TLabel').grid(row=0, column=0, sticky="w", padx=5, pady=1)
|
|
ttk.Entry(redis_params_frame, textvariable=self.ip_var, style='Small.TEntry').grid(row=0, column=1, sticky="ew", padx=5, pady=1)
|
|
|
|
ttk.Label(redis_params_frame, text="Port:", style='Small.TLabel').grid(row=1, column=0, sticky="w", padx=5, pady=1)
|
|
ttk.Entry(redis_params_frame, textvariable=self.port_var, style='Small.TEntry').grid(row=1, column=1, sticky="ew", padx=5, pady=1)
|
|
|
|
ttk.Button(redis_params_frame, text="Connect", command=self.connect_redis, style='Small.TButton').grid(row=2, column=0, columnspan=2, pady=4, padx=5, sticky="ew")
|
|
|
|
# --- Middle frame: Device selector (full width) ---
|
|
device_frame = ttk.LabelFrame(parent, text="Audio Device")
|
|
device_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
|
|
device_frame.columnconfigure(0, weight=1)
|
|
current_row += 1
|
|
|
|
self.device_var = tk.StringVar()
|
|
self.devices = self._get_devices()
|
|
self.device_menu = ttk.Combobox(device_frame, textvariable=self.device_var, values=list(self.devices.keys()), state='readonly', style='Small.TCombobox')
|
|
self.device_menu.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
|
|
try:
|
|
self.device_menu.set(list(self.devices.keys())[sd.default.device['input']])
|
|
except (ValueError, IndexError, KeyError):
|
|
if self.devices: self.device_menu.set(list(self.devices.keys())[0])
|
|
|
|
# --- Bottom frame: Buttons (full width) ---
|
|
button_frame = ttk.Frame(parent)
|
|
button_frame.grid(row=current_row, column=0, sticky="ew", pady=(10,0))
|
|
button_frame.columnconfigure((0, 1), weight=1)
|
|
current_row += 1
|
|
|
|
self.start_button = ttk.Button(button_frame, text="Start", command=self.toggle_stream, style='Small.TButton')
|
|
self.start_button.grid(row=0, column=0, padx=5, sticky="ew")
|
|
|
|
self.quit_button = ttk.Button(button_frame, text="Quit", command=self.on_closing, style='Small.TButton')
|
|
self.quit_button.grid(row=0, column=1, padx=5, sticky="ew")
|
|
|
|
# Attempt initial Redis connection
|
|
self.connect_redis()
|
|
|
|
|
|
|
|
def connect_redis(self):
|
|
"""Establish connection to Redis using current IP/Port entries."""
|
|
try:
|
|
host = self.ip_var.get()
|
|
port = int(self.port_var.get())
|
|
self.redis_conn = redis.Redis(host=host, port=port, decode_responses=True)
|
|
self.redis_conn.ping()
|
|
print(f"Connected to Redis at {host}:{port}")
|
|
except Exception as e:
|
|
print(f"Failed to connect to Redis: {e}")
|
|
self.redis_conn = None
|
|
|
|
def _create_vu_meters_area(self, parent):
|
|
self.vu_meter_container = parent
|
|
self.vu_meter_container.bind("<Configure>", lambda e: self.recreate_vu_meters())
|
|
self.recreate_vu_meters()
|
|
|
|
def recreate_vu_meters(self):
|
|
for widget in self.vu_meter_container.winfo_children():
|
|
widget.destroy()
|
|
self.vu_meters.clear()
|
|
|
|
try:
|
|
num_bands = int(self.settings['vu_bands'].get())
|
|
except (ValueError, KeyError):
|
|
num_bands = 0
|
|
|
|
for i in range(num_bands):
|
|
vu_meter = VUMeter(self.vu_meter_container)
|
|
vu_meter.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=2)
|
|
self.vu_meters.append(vu_meter)
|
|
|
|
def _create_plot_area(self):
|
|
self.fig, (self.ax_spectrum, self.ax_waveform) = plt.subplots(
|
|
2, 1, gridspec_kw={'height_ratios': [2, 3]}
|
|
)
|
|
self.fig.patch.set_facecolor('#282828')
|
|
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
|
|
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
|
|
self._configure_plots()
|
|
|
|
def _get_devices(self):
|
|
devices = sd.query_devices()
|
|
return {f"{i}: {d['name']}": i for i, d in enumerate(devices) if d['max_input_channels'] > 0}
|
|
|
|
def _configure_plots(self):
|
|
# Configure spectrum plot
|
|
self.ax_spectrum.clear()
|
|
self.ax_spectrum.set_facecolor('#282828')
|
|
self.ax_spectrum.set_xscale('log')
|
|
self.ax_spectrum.set_yscale('linear')
|
|
self.ax_spectrum.set_ylabel('Magnitude (dB)', color='white')
|
|
self.ax_spectrum.set_ylim(-60, 10)
|
|
self.ax_spectrum.set_xlim(20, self.samplerate / 2)
|
|
self.ax_spectrum.grid(True, color='gray', linestyle='--')
|
|
self.ax_spectrum.tick_params(axis='x', colors='white')
|
|
self.ax_spectrum.tick_params(axis='y', colors='white')
|
|
for spine in self.ax_spectrum.spines.values():
|
|
spine.set_edgecolor('white')
|
|
|
|
# Configure waveform plot
|
|
self.ax_waveform.clear()
|
|
self.ax_waveform.set_facecolor('#282828')
|
|
self.ax_waveform.set_xlabel('Time (Samples)', color='white')
|
|
self.ax_waveform.set_ylabel('Amplitude', color='white')
|
|
self.ax_waveform.set_ylim(-1, 1)
|
|
self.ax_waveform.grid(True, color='gray', linestyle='--')
|
|
self.ax_waveform.tick_params(axis='x', colors='white')
|
|
self.ax_waveform.tick_params(axis='y', colors='white')
|
|
for spine in self.ax_waveform.spines.values():
|
|
spine.set_edgecolor('white')
|
|
|
|
self.fig.tight_layout(pad=1.5)
|
|
self.canvas.draw()
|
|
|
|
def toggle_stream(self):
|
|
if self.stream is None:
|
|
self.start_stream()
|
|
else:
|
|
self.stop_stream()
|
|
|
|
def start_stream(self):
|
|
print("DEBUG: Attempting to start stream...")
|
|
try:
|
|
self.recreate_vu_meters()
|
|
device_name = self.device_var.get()
|
|
device_id = self.devices[device_name]
|
|
device_info = sd.query_devices(device_id, 'input')
|
|
print(f"DEBUG: Selected device: {device_name} (ID: {device_id})")
|
|
|
|
try:
|
|
self.samplerate = float(self.settings['samplerate'].get())
|
|
print(f"DEBUG: Using user-defined sample rate: {self.samplerate} Hz")
|
|
except (ValueError, KeyError):
|
|
self.samplerate = device_info['default_samplerate']
|
|
self.settings['samplerate'].set(str(int(self.samplerate)))
|
|
print(f"DEBUG: Invalid sample rate, using device default: {self.samplerate} Hz")
|
|
|
|
self.window_size = int(self.settings['window_size'].get())
|
|
print(f"DEBUG: Using window size: {self.window_size}")
|
|
|
|
# Pre-calculate frequency bins
|
|
self.xf = rfftfreq(self.window_size, 1 / self.samplerate)
|
|
|
|
# Configure plots and pre-create line objects
|
|
self._configure_plots()
|
|
self.ax_waveform.set_xlim(0, self.window_size)
|
|
self.spectrum_line, = self.ax_spectrum.plot(self.xf, np.zeros_like(self.xf), color='#00dd00')
|
|
self.waveform_line, = self.ax_waveform.plot(np.arange(self.window_size), np.zeros(self.window_size), color='#00dd00')
|
|
|
|
self.stream = sd.InputStream(
|
|
device=device_id, channels=1,
|
|
samplerate=self.samplerate, callback=self.audio_callback,
|
|
blocksize=self.window_size, latency='low')
|
|
|
|
self.animation = FuncAnimation(self.fig, self.update_plot, interval=refresh_rate, blit=True, cache_frame_data=False, save_count=sys.maxsize)
|
|
self.stream.start()
|
|
self.start_button.config(text="Stop")
|
|
print("DEBUG: Stream started successfully.")
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to start stream: {e}")
|
|
messagebox.showerror("Error starting stream", str(e))
|
|
self.stop_stream()
|
|
|
|
|
|
def stop_stream(self):
|
|
if getattr(self, 'animation', None):
|
|
self.animation.event_source.stop()
|
|
self.animation = None
|
|
print("DEBUG: Animation stopped.")
|
|
if self.stream:
|
|
self.stream.stop()
|
|
self.stream.close()
|
|
self.stream = None
|
|
print("DEBUG: Audio stream stopped and closed.")
|
|
with self.q.mutex:
|
|
self.q.queue.clear()
|
|
print("DEBUG: Queue cleared.")
|
|
self.start_button.config(text="Start")
|
|
|
|
|
|
def audio_callback(self, indata, frames, time, status):
|
|
if status:
|
|
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
|
|
|
|
if status:
|
|
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
|
|
self.q.put(indata[:, 0])
|
|
|
|
|
|
def update_plot(self, frame):
|
|
try:
|
|
data = self.q.get_nowait()
|
|
except queue.Empty:
|
|
return (self.spectrum_line, self.waveform_line)
|
|
|
|
if not self.spectrum_line or not self.waveform_line:
|
|
return (self.spectrum_line, self.waveform_line)
|
|
|
|
try:
|
|
N = len(data)
|
|
if N != self.window_size:
|
|
print(f"WARN: Mismatched data size. Expected {self.window_size}, got {N}")
|
|
return (self.plot_line,)
|
|
|
|
# Calculate RMS of time-domain signal
|
|
self.rms = float(np.sqrt(np.mean(np.square(data))))
|
|
|
|
yf = rfft(data)
|
|
magnitude_db = 20 * np.log10(np.abs(yf) / N + 1e-9)
|
|
self.spectrum_line.set_ydata(magnitude_db)
|
|
self.waveform_line.set_ydata(data)
|
|
|
|
# print(f"\rDEBUG: Queue: {self.q.qsize()}, Plot max: {np.max(magnitude_db):.2f} dB", end='')
|
|
|
|
self.update_vu_meters(np.abs(yf) / N)
|
|
|
|
# Push to Redis
|
|
if getattr(self, 'redis_conn', None):
|
|
try:
|
|
self.redis_conn.set('Db', magnitude_db.tobytes())
|
|
self.redis_conn.set('wave', data.tobytes())
|
|
|
|
except Exception as e:
|
|
print(f"Redis error: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"\nERROR in update_plot: {e}")
|
|
|
|
return (self.spectrum_line, self.waveform_line)
|
|
|
|
def update_vu_meters(self, magnitude):
|
|
"""
|
|
Calculate frequency band levels and update the VU meters.
|
|
|
|
The method divides the frequency spectrum into logarithmic bins, calculates the
|
|
average magnitude for each bin, converts it to a scaled level (0-10), and
|
|
updates the corresponding VU meter. It also pushes the resulting list of
|
|
levels to a Redis key named 'spectrum_10' if a connection is active.
|
|
|
|
Args:
|
|
magnitude (np.ndarray): The magnitude spectrum of the audio data.
|
|
"""
|
|
if not self.vu_meters or self.xf is None:
|
|
return
|
|
|
|
num_bands = len(self.vu_meters)
|
|
max_freq = self.samplerate / 2
|
|
if max_freq <= 20:
|
|
max_freq = 20000
|
|
freq_bins = np.logspace(np.log10(20), np.log10(max_freq), num_bands + 1)
|
|
|
|
levels = []
|
|
for i in range(num_bands):
|
|
low_freq, high_freq = freq_bins[i], freq_bins[i+1]
|
|
idx = np.where((self.xf >= low_freq) & (self.xf < high_freq))
|
|
if len(idx[0]) > 0:
|
|
avg_mag = np.mean(magnitude[idx])
|
|
else:
|
|
avg_mag = 0.0
|
|
|
|
# Convert linear magnitude to dB
|
|
level_db = 20 * np.log10(avg_mag + 1e-9) # -inf..0
|
|
# Map -60 dB → 0 and 0 dB → 10
|
|
level10 = np.clip((level_db + 60) / 6.0, 0.0, 10.0)
|
|
|
|
levels.append(round(float(level10), 2))
|
|
# Vu meter expects 0-1 range
|
|
self.vu_meters[i].set_level(level10 / 10.0)
|
|
|
|
# Build text representation and print once per call
|
|
self.tenbins = f"{levels}"
|
|
# Debug print
|
|
# print(f"tenbins={self.tenbins}, rms={self.rms:.3f}")
|
|
# print(levels[0], levels[1], levels[2])
|
|
|
|
# Push to Redis
|
|
if getattr(self, 'redis_conn', None):
|
|
try:
|
|
self.redis_conn.set('spectrum_10', self.tenbins)
|
|
self.redis_conn.set('rms', f"{self.rms:.3f}")
|
|
|
|
except Exception as e:
|
|
print(f"Redis error: {e}")
|
|
|
|
def on_closing(self):
|
|
self.stop_stream()
|
|
self.destroy()
|
|
|
|
if __name__ == "__main__":
|
|
app = AudioPlotterApp()
|
|
app.protocol("WM_DELETE_WINDOW", app.on_closing)
|
|
app.mainloop()
|