redilysis/redisplay.py

478 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Redisplay: Redilysis Audio Analysis UI
v0.2
This script provides a real-time audio visualization tool featuring a spectrum
analyzer, a waveform display, and multi-band VU meters. It captures audio from a
selected input device, processes it, and displays the analysis in a user-friendly
GUI built with Tkinter.
Key Features:
- Live waveform and spectrum plotting using Matplotlib.
- Configurable audio parameters (sample rate, window size).
- Variable display refresh rate.
- Device selection from available system audio inputs.
- VU meters for different frequency bands.
- Pushes analysis data to Redis, including spectrum, waveform, RMS, and frequency bins.
- Dark theme for the GUI using sv_ttk.
Redis Keys:
- 'Db': The full magnitude spectrum data (NumPy array, serialized to bytes).
- 'wave': The raw audio waveform data (NumPy array, serialized to bytes).
- 'spectrum_10': A string representation of a list of levels for the VU meter frequency bins (0-10).
- 'rms': The Root Mean Square of the raw audio signal (string).
Author: Sam
Date: 2025-07-15
License: to be defined
"""
import tkinter as tk
from tkinter import ttk, messagebox
import tkinter.font as tkfont
import sounddevice as sd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.animation import FuncAnimation
import queue
import sv_ttk
import sys
import redis
from scipy.fft import rfft, rfftfreq
refresh_rate = 25
class VUMeter(tk.Canvas):
def __init__(self, parent, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
self.config(width=40, bg='#282828', highlightthickness=0)
self.level = 0
self.bind("<Configure>", self._draw_meter)
def _draw_meter(self, event=None):
self.delete("all")
width = self.winfo_width()
height = self.winfo_height()
num_segments = 20
segment_height = (height - 4) / num_segments
level_segment = int(self.level * num_segments)
for i in range(num_segments):
y0 = height - (i + 1) * segment_height
y1 = height - i * segment_height - 2
color = self._get_color(i, num_segments)
if i < level_segment:
self.create_rectangle(2, y0, width - 2, y1, fill=color, outline="")
def _get_color(self, index, total):
if index > total * 0.85:
return "#ff0000"
elif index > total * 0.7:
return "#ff4500"
elif index > total * 0.5:
return "#ffa500"
else:
return "#00dd00"
def set_level(self, level):
self.level = max(0.0, min(1.0, level))
self._draw_meter()
class AudioPlotterApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("Redilysis")
self.geometry("1000x700")
sv_ttk.set_theme("dark")
self.stream = None
self.animation = None
self.q = queue.Queue()
self.vu_meters = []
self.samplerate = 44100 # Default, will be overwritten
self.spectrum_line = None
self.waveform_line = None
self.xf = None
# Define small font style for compact controls
self.small_font = tkfont.Font(size=9)
style = ttk.Style(self)
style.configure('Small.TLabel', font=self.small_font)
style.configure('Small.TEntry', font=self.small_font)
style.configure('Small.TButton', font=self.small_font)
style.configure('Small.TCombobox', font=self.small_font)
style.configure('Small.TLabelframe.Label', font=self.small_font)
# Apply small font to default ttk widget families in the app
self.option_add("*TLabel.Font", self.small_font)
self.option_add("*TEntry.Font", self.small_font)
self.option_add("*TButton.Font", self.small_font)
self.option_add("*TCombobox*Font", self.small_font)
self.option_add("*Labelframe.LabelFont", self.small_font)
self._create_widgets()
self._force_focus()
def _force_focus(self):
"""Force the window to the front, specifically for macOS."""
if sys.platform != "darwin":
return
self.lift()
self.attributes('-topmost', True)
self.after_idle(self.attributes, '-topmost', False)
self.focus_force()
def _create_widgets(self):
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
main_pane.pack(fill=tk.BOTH, expand=True)
top_pane = ttk.PanedWindow(main_pane, orient=tk.HORIZONTAL)
main_pane.add(top_pane, weight=1)
controls_frame = ttk.Frame(top_pane, padding="10")
top_pane.add(controls_frame, weight=0)
vu_meter_frame = ttk.Frame(top_pane, padding="5")
top_pane.add(vu_meter_frame, weight=1)
self.plot_frame = ttk.Frame(main_pane)
main_pane.add(self.plot_frame, weight=4)
self._create_controls(controls_frame)
self._create_vu_meters_area(vu_meter_frame)
self._create_plot_area()
def _create_controls(self, parent):
parent.columnconfigure(0, weight=1)
current_row = 0
# --- Top frame for parameters (2 columns) ---
top_params_frame = ttk.Frame(parent)
top_params_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
top_params_frame.columnconfigure((0, 1), weight=1)
current_row += 1
# Left column: Audio settings
audio_params_frame = ttk.LabelFrame(top_params_frame, text="Audio Parameters")
audio_params_frame.grid(row=0, column=0, padx=(0, 5), sticky="nsew")
audio_params_frame.columnconfigure(1, weight=1)
self.settings = {}
settings_map = {
'window_size': ('Window Size', 1024),
'downsample': ('Downsample', 1),
'vu_bands': ('VU Bands', 12),
'samplerate': ('Sample Rate', '44100'),
}
for i, (key, (text, default)) in enumerate(settings_map.items()):
ttk.Label(audio_params_frame, text=text, style='Small.TLabel').grid(row=i, column=0, padx=5, pady=1, sticky="w")
var = tk.StringVar(value=str(default))
entry = ttk.Entry(audio_params_frame, textvariable=var, style='Small.TEntry')
entry.grid(row=i, column=1, padx=5, pady=2, sticky="ew")
self.settings[key] = var
# Right column: Redis settings (visual only)
redis_params_frame = ttk.LabelFrame(top_params_frame, text="Redis Connection")
redis_params_frame.grid(row=0, column=1, padx=(5, 0), sticky="nsew")
redis_params_frame.columnconfigure(1, weight=1)
# Default Redis connection parameters
self.ip_var = tk.StringVar(value="127.0.0.1")
self.port_var = tk.StringVar(value="6379")
ttk.Label(redis_params_frame, text="IP Address:", style='Small.TLabel').grid(row=0, column=0, sticky="w", padx=5, pady=1)
ttk.Entry(redis_params_frame, textvariable=self.ip_var, style='Small.TEntry').grid(row=0, column=1, sticky="ew", padx=5, pady=1)
ttk.Label(redis_params_frame, text="Port:", style='Small.TLabel').grid(row=1, column=0, sticky="w", padx=5, pady=1)
ttk.Entry(redis_params_frame, textvariable=self.port_var, style='Small.TEntry').grid(row=1, column=1, sticky="ew", padx=5, pady=1)
ttk.Button(redis_params_frame, text="Connect", command=self.connect_redis, style='Small.TButton').grid(row=2, column=0, columnspan=2, pady=4, padx=5, sticky="ew")
# --- Middle frame: Device selector (full width) ---
device_frame = ttk.LabelFrame(parent, text="Audio Device")
device_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
device_frame.columnconfigure(0, weight=1)
current_row += 1
self.device_var = tk.StringVar()
self.devices = self._get_devices()
self.device_menu = ttk.Combobox(device_frame, textvariable=self.device_var, values=list(self.devices.keys()), state='readonly', style='Small.TCombobox')
self.device_menu.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
try:
self.device_menu.set(list(self.devices.keys())[sd.default.device['input']])
except (ValueError, IndexError, KeyError):
if self.devices: self.device_menu.set(list(self.devices.keys())[0])
# --- Bottom frame: Buttons (full width) ---
button_frame = ttk.Frame(parent)
button_frame.grid(row=current_row, column=0, sticky="ew", pady=(10,0))
button_frame.columnconfigure((0, 1), weight=1)
current_row += 1
self.start_button = ttk.Button(button_frame, text="Start", command=self.toggle_stream, style='Small.TButton')
self.start_button.grid(row=0, column=0, padx=5, sticky="ew")
self.quit_button = ttk.Button(button_frame, text="Quit", command=self.on_closing, style='Small.TButton')
self.quit_button.grid(row=0, column=1, padx=5, sticky="ew")
# Attempt initial Redis connection
self.connect_redis()
def connect_redis(self):
"""Establish connection to Redis using current IP/Port entries."""
try:
host = self.ip_var.get()
port = int(self.port_var.get())
self.redis_conn = redis.Redis(host=host, port=port, decode_responses=True)
self.redis_conn.ping()
print(f"Connected to Redis at {host}:{port}")
except Exception as e:
print(f"Failed to connect to Redis: {e}")
self.redis_conn = None
def _create_vu_meters_area(self, parent):
self.vu_meter_container = parent
self.vu_meter_container.bind("<Configure>", lambda e: self.recreate_vu_meters())
self.recreate_vu_meters()
def recreate_vu_meters(self):
for widget in self.vu_meter_container.winfo_children():
widget.destroy()
self.vu_meters.clear()
try:
num_bands = int(self.settings['vu_bands'].get())
except (ValueError, KeyError):
num_bands = 0
for i in range(num_bands):
vu_meter = VUMeter(self.vu_meter_container)
vu_meter.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=2)
self.vu_meters.append(vu_meter)
def _create_plot_area(self):
self.fig, (self.ax_spectrum, self.ax_waveform) = plt.subplots(
2, 1, gridspec_kw={'height_ratios': [2, 3]}
)
self.fig.patch.set_facecolor('#282828')
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
self._configure_plots()
def _get_devices(self):
devices = sd.query_devices()
return {f"{i}: {d['name']}": i for i, d in enumerate(devices) if d['max_input_channels'] > 0}
def _configure_plots(self):
# Configure spectrum plot
self.ax_spectrum.clear()
self.ax_spectrum.set_facecolor('#282828')
self.ax_spectrum.set_xscale('log')
self.ax_spectrum.set_yscale('linear')
self.ax_spectrum.set_ylabel('Magnitude (dB)', color='white')
self.ax_spectrum.set_ylim(-60, 10)
self.ax_spectrum.set_xlim(20, self.samplerate / 2)
self.ax_spectrum.grid(True, color='gray', linestyle='--')
self.ax_spectrum.tick_params(axis='x', colors='white')
self.ax_spectrum.tick_params(axis='y', colors='white')
for spine in self.ax_spectrum.spines.values():
spine.set_edgecolor('white')
# Configure waveform plot
self.ax_waveform.clear()
self.ax_waveform.set_facecolor('#282828')
self.ax_waveform.set_xlabel('Time (Samples)', color='white')
self.ax_waveform.set_ylabel('Amplitude', color='white')
self.ax_waveform.set_ylim(-1, 1)
self.ax_waveform.grid(True, color='gray', linestyle='--')
self.ax_waveform.tick_params(axis='x', colors='white')
self.ax_waveform.tick_params(axis='y', colors='white')
for spine in self.ax_waveform.spines.values():
spine.set_edgecolor('white')
self.fig.tight_layout(pad=1.5)
self.canvas.draw()
def toggle_stream(self):
if self.stream is None:
self.start_stream()
else:
self.stop_stream()
def start_stream(self):
print("DEBUG: Attempting to start stream...")
try:
self.recreate_vu_meters()
device_name = self.device_var.get()
device_id = self.devices[device_name]
device_info = sd.query_devices(device_id, 'input')
print(f"DEBUG: Selected device: {device_name} (ID: {device_id})")
try:
self.samplerate = float(self.settings['samplerate'].get())
print(f"DEBUG: Using user-defined sample rate: {self.samplerate} Hz")
except (ValueError, KeyError):
self.samplerate = device_info['default_samplerate']
self.settings['samplerate'].set(str(int(self.samplerate)))
print(f"DEBUG: Invalid sample rate, using device default: {self.samplerate} Hz")
self.window_size = int(self.settings['window_size'].get())
print(f"DEBUG: Using window size: {self.window_size}")
# Pre-calculate frequency bins
self.xf = rfftfreq(self.window_size, 1 / self.samplerate)
# Configure plots and pre-create line objects
self._configure_plots()
self.ax_waveform.set_xlim(0, self.window_size)
self.spectrum_line, = self.ax_spectrum.plot(self.xf, np.zeros_like(self.xf), color='#00dd00')
self.waveform_line, = self.ax_waveform.plot(np.arange(self.window_size), np.zeros(self.window_size), color='#00dd00')
self.stream = sd.InputStream(
device=device_id, channels=1,
samplerate=self.samplerate, callback=self.audio_callback,
blocksize=self.window_size, latency='low')
self.animation = FuncAnimation(self.fig, self.update_plot, interval=refresh_rate, blit=True, cache_frame_data=False, save_count=sys.maxsize)
self.stream.start()
self.start_button.config(text="Stop")
print("DEBUG: Stream started successfully.")
except Exception as e:
print(f"ERROR: Failed to start stream: {e}")
messagebox.showerror("Error starting stream", str(e))
self.stop_stream()
def stop_stream(self):
if getattr(self, 'animation', None):
self.animation.event_source.stop()
self.animation = None
print("DEBUG: Animation stopped.")
if self.stream:
self.stream.stop()
self.stream.close()
self.stream = None
print("DEBUG: Audio stream stopped and closed.")
with self.q.mutex:
self.q.queue.clear()
print("DEBUG: Queue cleared.")
self.start_button.config(text="Start")
def audio_callback(self, indata, frames, time, status):
if status:
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
if status:
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
self.q.put(indata[:, 0])
def update_plot(self, frame):
try:
data = self.q.get_nowait()
except queue.Empty:
return (self.spectrum_line, self.waveform_line)
if not self.spectrum_line or not self.waveform_line:
return (self.spectrum_line, self.waveform_line)
try:
N = len(data)
if N != self.window_size:
print(f"WARN: Mismatched data size. Expected {self.window_size}, got {N}")
return (self.plot_line,)
# Calculate RMS of time-domain signal
self.rms = float(np.sqrt(np.mean(np.square(data))))
yf = rfft(data)
magnitude_db = 20 * np.log10(np.abs(yf) / N + 1e-9)
self.spectrum_line.set_ydata(magnitude_db)
self.waveform_line.set_ydata(data)
# print(f"\rDEBUG: Queue: {self.q.qsize()}, Plot max: {np.max(magnitude_db):.2f} dB", end='')
self.update_vu_meters(np.abs(yf) / N)
# Push to Redis
if getattr(self, 'redis_conn', None):
try:
self.redis_conn.set('Db', magnitude_db.tobytes())
self.redis_conn.set('wave', data.tobytes())
except Exception as e:
print(f"Redis error: {e}")
except Exception as e:
print(f"\nERROR in update_plot: {e}")
return (self.spectrum_line, self.waveform_line)
def update_vu_meters(self, magnitude):
"""
Calculate frequency band levels and update the VU meters.
The method divides the frequency spectrum into logarithmic bins, calculates the
average magnitude for each bin, converts it to a scaled level (0-10), and
updates the corresponding VU meter. It also pushes the resulting list of
levels to a Redis key named 'spectrum_10' if a connection is active.
Args:
magnitude (np.ndarray): The magnitude spectrum of the audio data.
"""
if not self.vu_meters or self.xf is None:
return
num_bands = len(self.vu_meters)
max_freq = self.samplerate / 2
if max_freq <= 20:
max_freq = 20000
freq_bins = np.logspace(np.log10(20), np.log10(max_freq), num_bands + 1)
levels = []
for i in range(num_bands):
low_freq, high_freq = freq_bins[i], freq_bins[i+1]
idx = np.where((self.xf >= low_freq) & (self.xf < high_freq))
if len(idx[0]) > 0:
avg_mag = np.mean(magnitude[idx])
else:
avg_mag = 0.0
# Convert linear magnitude to dB
level_db = 20 * np.log10(avg_mag + 1e-9) # -inf..0
# Map -60 dB → 0 and 0 dB → 10
level10 = np.clip((level_db + 60) / 6.0, 0.0, 10.0)
levels.append(round(float(level10), 2))
# Vu meter expects 0-1 range
self.vu_meters[i].set_level(level10 / 10.0)
# Build text representation and print once per call
self.tenbins = f"{levels}"
# Debug print
# print(f"tenbins={self.tenbins}, rms={self.rms:.3f}")
# print(levels[0], levels[1], levels[2])
# Push to Redis
if getattr(self, 'redis_conn', None):
try:
self.redis_conn.set('spectrum_10', self.tenbins)
self.redis_conn.set('rms', f"{self.rms:.3f}")
except Exception as e:
print(f"Redis error: {e}")
def on_closing(self):
self.stop_stream()
self.destroy()
if __name__ == "__main__":
app = AudioPlotterApp()
app.protocol("WM_DELETE_WINDOW", app.on_closing)
app.mainloop()