#!/usr/bin/env python3 """ Redisplay: Redilysis Audio Analysis UI v0.2 This script provides a real-time audio visualization tool featuring a spectrum analyzer, a waveform display, and multi-band VU meters. It captures audio from a selected input device, processes it, and displays the analysis in a user-friendly GUI built with Tkinter. Key Features: - Live waveform and spectrum plotting using Matplotlib. - Configurable audio parameters (sample rate, window size). - Variable display refresh rate. - Device selection from available system audio inputs. - VU meters for different frequency bands. - Pushes analysis data to Redis, including spectrum, waveform, RMS, and frequency bins. - Dark theme for the GUI using sv_ttk. Redis Keys: - 'Db': The full magnitude spectrum data (NumPy array, serialized to bytes). - 'wave': The raw audio waveform data (NumPy array, serialized to bytes). - 'spectrum_10': A string representation of a list of levels for the VU meter frequency bins (0-10). - 'rms': The Root Mean Square of the raw audio signal (string). Author: Sam Date: 2025-07-15 License: to be defined """ import tkinter as tk from tkinter import ttk, messagebox import tkinter.font as tkfont import sounddevice as sd import numpy as np import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.animation import FuncAnimation import queue import sv_ttk import sys import redis from scipy.fft import rfft, rfftfreq refresh_rate = 25 class VUMeter(tk.Canvas): def __init__(self, parent, *args, **kwargs): super().__init__(parent, *args, **kwargs) self.config(width=40, bg='#282828', highlightthickness=0) self.level = 0 self.bind("", self._draw_meter) def _draw_meter(self, event=None): self.delete("all") width = self.winfo_width() height = self.winfo_height() num_segments = 20 segment_height = (height - 4) / num_segments level_segment = int(self.level * num_segments) for i in range(num_segments): y0 = height - (i + 1) * segment_height y1 = height - i * segment_height - 2 color = self._get_color(i, num_segments) if i < level_segment: self.create_rectangle(2, y0, width - 2, y1, fill=color, outline="") def _get_color(self, index, total): if index > total * 0.85: return "#ff0000" elif index > total * 0.7: return "#ff4500" elif index > total * 0.5: return "#ffa500" else: return "#00dd00" def set_level(self, level): self.level = max(0.0, min(1.0, level)) self._draw_meter() class AudioPlotterApp(tk.Tk): def __init__(self): super().__init__() self.title("Redilysis") self.geometry("1000x700") sv_ttk.set_theme("dark") self.stream = None self.animation = None self.q = queue.Queue() self.vu_meters = [] self.samplerate = 44100 # Default, will be overwritten self.spectrum_line = None self.waveform_line = None self.xf = None # Define small font style for compact controls self.small_font = tkfont.Font(size=9) style = ttk.Style(self) style.configure('Small.TLabel', font=self.small_font) style.configure('Small.TEntry', font=self.small_font) style.configure('Small.TButton', font=self.small_font) style.configure('Small.TCombobox', font=self.small_font) style.configure('Small.TLabelframe.Label', font=self.small_font) # Apply small font to default ttk widget families in the app self.option_add("*TLabel.Font", self.small_font) self.option_add("*TEntry.Font", self.small_font) self.option_add("*TButton.Font", self.small_font) self.option_add("*TCombobox*Font", self.small_font) self.option_add("*Labelframe.LabelFont", self.small_font) self._create_widgets() self._force_focus() def _force_focus(self): """Force the window to the front, specifically for macOS.""" if sys.platform != "darwin": return self.lift() self.attributes('-topmost', True) self.after_idle(self.attributes, '-topmost', False) self.focus_force() def _create_widgets(self): main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True) top_pane = ttk.PanedWindow(main_pane, orient=tk.HORIZONTAL) main_pane.add(top_pane, weight=1) controls_frame = ttk.Frame(top_pane, padding="10") top_pane.add(controls_frame, weight=0) vu_meter_frame = ttk.Frame(top_pane, padding="5") top_pane.add(vu_meter_frame, weight=1) self.plot_frame = ttk.Frame(main_pane) main_pane.add(self.plot_frame, weight=4) self._create_controls(controls_frame) self._create_vu_meters_area(vu_meter_frame) self._create_plot_area() def _create_controls(self, parent): parent.columnconfigure(0, weight=1) current_row = 0 # --- Top frame for parameters (2 columns) --- top_params_frame = ttk.Frame(parent) top_params_frame.grid(row=current_row, column=0, sticky="ew", pady=5) top_params_frame.columnconfigure((0, 1), weight=1) current_row += 1 # Left column: Audio settings audio_params_frame = ttk.LabelFrame(top_params_frame, text="Audio Parameters") audio_params_frame.grid(row=0, column=0, padx=(0, 5), sticky="nsew") audio_params_frame.columnconfigure(1, weight=1) self.settings = {} settings_map = { 'window_size': ('Window Size', 1024), 'downsample': ('Downsample', 1), 'vu_bands': ('VU Bands', 12), 'samplerate': ('Sample Rate', '44100'), } for i, (key, (text, default)) in enumerate(settings_map.items()): ttk.Label(audio_params_frame, text=text, style='Small.TLabel').grid(row=i, column=0, padx=5, pady=1, sticky="w") var = tk.StringVar(value=str(default)) entry = ttk.Entry(audio_params_frame, textvariable=var, style='Small.TEntry') entry.grid(row=i, column=1, padx=5, pady=2, sticky="ew") self.settings[key] = var # Right column: Redis settings (visual only) redis_params_frame = ttk.LabelFrame(top_params_frame, text="Redis Connection") redis_params_frame.grid(row=0, column=1, padx=(5, 0), sticky="nsew") redis_params_frame.columnconfigure(1, weight=1) # Default Redis connection parameters self.ip_var = tk.StringVar(value="127.0.0.1") self.port_var = tk.StringVar(value="6379") ttk.Label(redis_params_frame, text="IP Address:", style='Small.TLabel').grid(row=0, column=0, sticky="w", padx=5, pady=1) ttk.Entry(redis_params_frame, textvariable=self.ip_var, style='Small.TEntry').grid(row=0, column=1, sticky="ew", padx=5, pady=1) ttk.Label(redis_params_frame, text="Port:", style='Small.TLabel').grid(row=1, column=0, sticky="w", padx=5, pady=1) ttk.Entry(redis_params_frame, textvariable=self.port_var, style='Small.TEntry').grid(row=1, column=1, sticky="ew", padx=5, pady=1) ttk.Button(redis_params_frame, text="Connect", command=self.connect_redis, style='Small.TButton').grid(row=2, column=0, columnspan=2, pady=4, padx=5, sticky="ew") # --- Middle frame: Device selector (full width) --- device_frame = ttk.LabelFrame(parent, text="Audio Device") device_frame.grid(row=current_row, column=0, sticky="ew", pady=5) device_frame.columnconfigure(0, weight=1) current_row += 1 self.device_var = tk.StringVar() self.devices = self._get_devices() self.device_menu = ttk.Combobox(device_frame, textvariable=self.device_var, values=list(self.devices.keys()), state='readonly', style='Small.TCombobox') self.device_menu.grid(row=0, column=0, sticky="ew", padx=5, pady=5) try: self.device_menu.set(list(self.devices.keys())[sd.default.device['input']]) except (ValueError, IndexError, KeyError): if self.devices: self.device_menu.set(list(self.devices.keys())[0]) # --- Bottom frame: Buttons (full width) --- button_frame = ttk.Frame(parent) button_frame.grid(row=current_row, column=0, sticky="ew", pady=(10,0)) button_frame.columnconfigure((0, 1), weight=1) current_row += 1 self.start_button = ttk.Button(button_frame, text="Start", command=self.toggle_stream, style='Small.TButton') self.start_button.grid(row=0, column=0, padx=5, sticky="ew") self.quit_button = ttk.Button(button_frame, text="Quit", command=self.on_closing, style='Small.TButton') self.quit_button.grid(row=0, column=1, padx=5, sticky="ew") # Attempt initial Redis connection self.connect_redis() def connect_redis(self): """Establish connection to Redis using current IP/Port entries.""" try: host = self.ip_var.get() port = int(self.port_var.get()) self.redis_conn = redis.Redis(host=host, port=port, decode_responses=True) self.redis_conn.ping() print(f"Connected to Redis at {host}:{port}") except Exception as e: print(f"Failed to connect to Redis: {e}") self.redis_conn = None def _create_vu_meters_area(self, parent): self.vu_meter_container = parent self.vu_meter_container.bind("", lambda e: self.recreate_vu_meters()) self.recreate_vu_meters() def recreate_vu_meters(self): for widget in self.vu_meter_container.winfo_children(): widget.destroy() self.vu_meters.clear() try: num_bands = int(self.settings['vu_bands'].get()) except (ValueError, KeyError): num_bands = 0 for i in range(num_bands): vu_meter = VUMeter(self.vu_meter_container) vu_meter.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=2) self.vu_meters.append(vu_meter) def _create_plot_area(self): self.fig, (self.ax_spectrum, self.ax_waveform) = plt.subplots( 2, 1, gridspec_kw={'height_ratios': [2, 3]} ) self.fig.patch.set_facecolor('#282828') self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) self._configure_plots() def _get_devices(self): devices = sd.query_devices() return {f"{i}: {d['name']}": i for i, d in enumerate(devices) if d['max_input_channels'] > 0} def _configure_plots(self): # Configure spectrum plot self.ax_spectrum.clear() self.ax_spectrum.set_facecolor('#282828') self.ax_spectrum.set_xscale('log') self.ax_spectrum.set_yscale('linear') self.ax_spectrum.set_ylabel('Magnitude (dB)', color='white') self.ax_spectrum.set_ylim(-60, 10) self.ax_spectrum.set_xlim(20, self.samplerate / 2) self.ax_spectrum.grid(True, color='gray', linestyle='--') self.ax_spectrum.tick_params(axis='x', colors='white') self.ax_spectrum.tick_params(axis='y', colors='white') for spine in self.ax_spectrum.spines.values(): spine.set_edgecolor('white') # Configure waveform plot self.ax_waveform.clear() self.ax_waveform.set_facecolor('#282828') self.ax_waveform.set_xlabel('Time (Samples)', color='white') self.ax_waveform.set_ylabel('Amplitude', color='white') self.ax_waveform.set_ylim(-1, 1) self.ax_waveform.grid(True, color='gray', linestyle='--') self.ax_waveform.tick_params(axis='x', colors='white') self.ax_waveform.tick_params(axis='y', colors='white') for spine in self.ax_waveform.spines.values(): spine.set_edgecolor('white') self.fig.tight_layout(pad=1.5) self.canvas.draw() def toggle_stream(self): if self.stream is None: self.start_stream() else: self.stop_stream() def start_stream(self): print("DEBUG: Attempting to start stream...") try: self.recreate_vu_meters() device_name = self.device_var.get() device_id = self.devices[device_name] device_info = sd.query_devices(device_id, 'input') print(f"DEBUG: Selected device: {device_name} (ID: {device_id})") try: self.samplerate = float(self.settings['samplerate'].get()) print(f"DEBUG: Using user-defined sample rate: {self.samplerate} Hz") except (ValueError, KeyError): self.samplerate = device_info['default_samplerate'] self.settings['samplerate'].set(str(int(self.samplerate))) print(f"DEBUG: Invalid sample rate, using device default: {self.samplerate} Hz") self.window_size = int(self.settings['window_size'].get()) print(f"DEBUG: Using window size: {self.window_size}") # Pre-calculate frequency bins self.xf = rfftfreq(self.window_size, 1 / self.samplerate) # Configure plots and pre-create line objects self._configure_plots() self.ax_waveform.set_xlim(0, self.window_size) self.spectrum_line, = self.ax_spectrum.plot(self.xf, np.zeros_like(self.xf), color='#00dd00') self.waveform_line, = self.ax_waveform.plot(np.arange(self.window_size), np.zeros(self.window_size), color='#00dd00') self.stream = sd.InputStream( device=device_id, channels=1, samplerate=self.samplerate, callback=self.audio_callback, blocksize=self.window_size, latency='low') self.animation = FuncAnimation(self.fig, self.update_plot, interval=refresh_rate, blit=True, cache_frame_data=False, save_count=sys.maxsize) self.stream.start() self.start_button.config(text="Stop") print("DEBUG: Stream started successfully.") except Exception as e: print(f"ERROR: Failed to start stream: {e}") messagebox.showerror("Error starting stream", str(e)) self.stop_stream() def stop_stream(self): if getattr(self, 'animation', None): self.animation.event_source.stop() self.animation = None print("DEBUG: Animation stopped.") if self.stream: self.stream.stop() self.stream.close() self.stream = None print("DEBUG: Audio stream stopped and closed.") with self.q.mutex: self.q.queue.clear() print("DEBUG: Queue cleared.") self.start_button.config(text="Start") def audio_callback(self, indata, frames, time, status): if status: print(f"ERROR in audio_callback: {status}", file=sys.stderr) if status: print(f"ERROR in audio_callback: {status}", file=sys.stderr) self.q.put(indata[:, 0]) def update_plot(self, frame): try: data = self.q.get_nowait() except queue.Empty: return (self.spectrum_line, self.waveform_line) if not self.spectrum_line or not self.waveform_line: return (self.spectrum_line, self.waveform_line) try: N = len(data) if N != self.window_size: print(f"WARN: Mismatched data size. Expected {self.window_size}, got {N}") return (self.plot_line,) # Calculate RMS of time-domain signal self.rms = float(np.sqrt(np.mean(np.square(data)))) yf = rfft(data) magnitude_db = 20 * np.log10(np.abs(yf) / N + 1e-9) self.spectrum_line.set_ydata(magnitude_db) self.waveform_line.set_ydata(data) # print(f"\rDEBUG: Queue: {self.q.qsize()}, Plot max: {np.max(magnitude_db):.2f} dB", end='') self.update_vu_meters(np.abs(yf) / N) # Push to Redis if getattr(self, 'redis_conn', None): try: self.redis_conn.set('Db', magnitude_db.tobytes()) self.redis_conn.set('wave', data.tobytes()) except Exception as e: print(f"Redis error: {e}") except Exception as e: print(f"\nERROR in update_plot: {e}") return (self.spectrum_line, self.waveform_line) def update_vu_meters(self, magnitude): """ Calculate frequency band levels and update the VU meters. The method divides the frequency spectrum into logarithmic bins, calculates the average magnitude for each bin, converts it to a scaled level (0-10), and updates the corresponding VU meter. It also pushes the resulting list of levels to a Redis key named 'spectrum_10' if a connection is active. Args: magnitude (np.ndarray): The magnitude spectrum of the audio data. """ if not self.vu_meters or self.xf is None: return num_bands = len(self.vu_meters) max_freq = self.samplerate / 2 if max_freq <= 20: max_freq = 20000 freq_bins = np.logspace(np.log10(20), np.log10(max_freq), num_bands + 1) levels = [] for i in range(num_bands): low_freq, high_freq = freq_bins[i], freq_bins[i+1] idx = np.where((self.xf >= low_freq) & (self.xf < high_freq)) if len(idx[0]) > 0: avg_mag = np.mean(magnitude[idx]) else: avg_mag = 0.0 # Convert linear magnitude to dB level_db = 20 * np.log10(avg_mag + 1e-9) # -inf..0 # Map -60 dB → 0 and 0 dB → 10 level10 = np.clip((level_db + 60) / 6.0, 0.0, 10.0) levels.append(round(float(level10), 2)) # Vu meter expects 0-1 range self.vu_meters[i].set_level(level10 / 10.0) # Build text representation and print once per call self.tenbins = f"{levels}" # Debug print # print(f"tenbins={self.tenbins}, rms={self.rms:.3f}") # print(levels[0], levels[1], levels[2]) # Push to Redis if getattr(self, 'redis_conn', None): try: self.redis_conn.set('spectrum_10', self.tenbins) self.redis_conn.set('rms', f"{self.rms:.3f}") except Exception as e: print(f"Redis error: {e}") def on_closing(self): self.stop_stream() self.destroy() if __name__ == "__main__": app = AudioPlotterApp() app.protocol("WM_DELETE_WINDOW", app.on_closing) app.mainloop()