python3 version and UI version with less computation
This commit is contained in:
parent
509164ad5b
commit
93a4279dd7
299
redilysis.3.py
Executable file
299
redilysis.3.py
Executable file
@ -0,0 +1,299 @@
|
||||
"""
|
||||
Sends live audio analysis to the terminal.
|
||||
|
||||
Based on musicinformationretrieval.com/realtime_spectrogram.py
|
||||
|
||||
For more examples using PyAudio:
|
||||
https://github.com/mwickert/scikit-dsp-comm/blob/master/sk_dsp_comm/pyaudio_helper.py
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
import argparse
|
||||
import json
|
||||
import librosa
|
||||
import math
|
||||
import numpy
|
||||
import os
|
||||
import pyaudio
|
||||
import redis
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
def debug(*args, **kwargs):
|
||||
if( verbose == False ):
|
||||
return
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
|
||||
|
||||
# Define default variables.
|
||||
BAND_OCTAVES = 10 # 12 * 9 octaves
|
||||
_BAND_TONES = BAND_OCTAVES * 12 # octaves * notes per octave
|
||||
_CHANNELS = 1
|
||||
_FRAMES_PER_BUFFER = 4410
|
||||
_N_FFT = 4096
|
||||
_RATE = 44100
|
||||
_SAMPLING_FREQUENCY = 0.1
|
||||
_BPM_MIN=10
|
||||
_BPM_MAX=400
|
||||
|
||||
# Argument parsing
|
||||
parser = argparse.ArgumentParser(prog='realtime_redis')
|
||||
# Standard Args
|
||||
parser.add_argument("-v","--verbose",action="store_true",help="Verbose")
|
||||
# Redis Args
|
||||
parser.add_argument("-i","--ip",help="IP address of the Redis server ",default="127.0.0.1",type=str)
|
||||
parser.add_argument("-p","--port",help="Port of the Redis server ",default="6379",type=str)
|
||||
# Audio Capture Args
|
||||
parser.add_argument('--list-devices','-L', action='store_true', help='Which devices are detected by pyaudio')
|
||||
parser.add_argument('--mode','-m', required=False, default='spectrum', choices=['spectrum', 'bpm'], type=str, help='Which mode to use. Default=spectrum')
|
||||
parser.add_argument('--device','-d', required=False, type=int, help='Which pyaudio device to use')
|
||||
parser.add_argument('--sampling-frequency','-s', required=False, default=0.1, type=float, help='Which frequency, in seconds. Default={}f '.format(_SAMPLING_FREQUENCY))
|
||||
parser.add_argument('--channels','-c', required=False, default=_CHANNELS, type=int, help='How many channels. Default={} '.format(_CHANNELS))
|
||||
parser.add_argument('--rate','-r', required=False, default=44100, type=int, help='The audio capture rate in Hz. Default={} '.format(_RATE))
|
||||
parser.add_argument('--frames','-f', required=False, default=4410, type=int, help='How many frames per buffer. Default={}'.format(_FRAMES_PER_BUFFER))
|
||||
# BPM Mode Args
|
||||
parser.add_argument('--bpm-min', required=False, default=_BPM_MIN, type=int, help='BPM mode only. The low BPM threshold. Default={} '.format(_BPM_MIN))
|
||||
parser.add_argument('--bpm-max', required=False, default=_BPM_MAX, type=int, help='BPM mode only. The high BPM threshold. Default={} '.format(_BPM_MAX))
|
||||
# Link option
|
||||
parser.add_argument('-link',help="Enable Ableton Link (disabled by default)", dest='link', action='store_true')
|
||||
parser.set_defaults(link=False)
|
||||
|
||||
#parser.add_argument('--link','-a',required=False, default=False, action='store_true',help='Allow link update. Default = False')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
# global
|
||||
bpm = 120.0
|
||||
start = 0
|
||||
|
||||
# Set real variables
|
||||
F_LO = librosa.note_to_hz('C0')
|
||||
F_HI = librosa.note_to_hz('C10')
|
||||
BAND_TONES = _BAND_TONES
|
||||
N_FFT = _N_FFT
|
||||
CHANNELS = args.channels
|
||||
DEVICE = args.device
|
||||
FRAMES_PER_BUFFER = int(args.rate * args.sampling_frequency )
|
||||
LIST_DEVICES = args.list_devices
|
||||
MODE = args.mode
|
||||
RATE = args.rate
|
||||
SAMPLING_FREQUENCY = args.sampling_frequency
|
||||
bpm_min = args.bpm_min
|
||||
bpm_max = args.bpm_max
|
||||
ip = args.ip
|
||||
port = args.port
|
||||
verbose = args.verbose
|
||||
|
||||
if( MODE == "bpm" and SAMPLING_FREQUENCY < 0.5 ):
|
||||
debug( "You should use a --sampling_frequency superior to 0.5 in BPM mode...")
|
||||
|
||||
# with Ableton Link
|
||||
if args.link == True:
|
||||
from libs import alink
|
||||
alink.Start()
|
||||
linked = True
|
||||
else:
|
||||
linked = False
|
||||
|
||||
melFilter = librosa.filters.mel(
|
||||
sr=RATE,
|
||||
n_fft=N_FFT,
|
||||
n_mels=BAND_TONES,
|
||||
fmin=F_LO,
|
||||
fmax=F_HI
|
||||
)
|
||||
r = redis.Redis(
|
||||
host=ip,
|
||||
port=port)
|
||||
|
||||
# Early exit to list devices
|
||||
# As it may crash later if not properly configured
|
||||
#
|
||||
def list_devices():
|
||||
# List all audio input devices
|
||||
p = pyaudio.PyAudio()
|
||||
i = 0
|
||||
n = p.get_device_count()
|
||||
print("\nFound {} devices\n".format(n))
|
||||
print(" {} {}".format('ID', 'Device name'))
|
||||
while i < n:
|
||||
dev = p.get_device_info_by_index(i)
|
||||
if dev['maxInputChannels'] > 0:
|
||||
print(" {} {}".format(i, dev['name']))
|
||||
i += 1
|
||||
if( LIST_DEVICES ):
|
||||
list_devices()
|
||||
os._exit(1)
|
||||
|
||||
|
||||
|
||||
def m_bpm(audio_data):
|
||||
"""
|
||||
This function saves slow analysis to redis
|
||||
* bpm
|
||||
* beat
|
||||
"""
|
||||
global bpm
|
||||
global start
|
||||
|
||||
|
||||
# Detect tempo / bpm
|
||||
new_bpm, beats = librosa.beat.beat_track(
|
||||
y = audio_data,
|
||||
sr = RATE,
|
||||
trim = False,
|
||||
#start_bpm = bpm,
|
||||
units = "time"
|
||||
)
|
||||
'''
|
||||
new_bpm = librosa.beat.tempo(y = audio_data, sr=RATE)[0]
|
||||
|
||||
'''
|
||||
# Correct the eventual octave error
|
||||
if new_bpm < bpm_min or new_bpm > bpm_max:
|
||||
found = False
|
||||
octaveErrorList = [ 0.5, 2, 0.3333, 3 ]
|
||||
for key,factor in enumerate(octaveErrorList):
|
||||
correction = new_bpm * factor
|
||||
if correction > bpm_min and correction < bpm_max:
|
||||
debug( "Corrected high/low bpm:{} to:{}".format(new_bpm, correction))
|
||||
new_bpm = correction
|
||||
found = True
|
||||
break
|
||||
if found == False:
|
||||
if new_bpm < bpm_min :
|
||||
new_bpm = bpm_min
|
||||
else :
|
||||
new_bpm = bpm_max
|
||||
|
||||
if args.link == True:
|
||||
alink.newtempo(new_bpm)
|
||||
|
||||
debug("new_bpm:{}".format(new_bpm))
|
||||
'''
|
||||
How to guess the next beats based on the data sent to redis
|
||||
~~ A Dirty Graph ~~
|
||||
|
||||
|start end|
|
||||
Capture |........................|
|
||||
BPM detect+Redis set ||
|
||||
Client Redis get |
|
||||
|
||||
Time |........................||.............|
|
||||
---SAMPLING_FREQUENCY----
|
||||
- < TIME-START
|
||||
Read Delay --------------- < 2*SAMPLING_FREQUENCY - PTTL
|
||||
Delay -----------------------------------------
|
||||
Beats |last beat
|
||||
. known ...b....b....b....b....b.
|
||||
. passed (...b....b....b.)
|
||||
. guessed (..b....b....b....b...
|
||||
Next Beat Calculation b....b....b....b.|..b
|
||||
Beats |last beat
|
||||
0 1 2 3 4
|
||||
|
||||
Redis:
|
||||
|
||||
key bpm_sample_interval
|
||||
visual |........................|
|
||||
|
||||
key bpm_delay
|
||||
visual |.........................|
|
||||
|
||||
'''
|
||||
bpm = new_bpm
|
||||
bpm_sample_interval = SAMPLING_FREQUENCY * 1000
|
||||
bpm_delay = (SAMPLING_FREQUENCY + time.time() - start ) * 1000
|
||||
pexpireat = int( 2 * bpm_sample_interval);
|
||||
# Save to Redis
|
||||
r.set( 'bpm', round(bpm,2), px = pexpireat )
|
||||
r.set( 'bpm_sample_interval', bpm_sample_interval )
|
||||
r.set( 'bpm_delay', bpm_delay )
|
||||
r.set( 'beats', json.dumps( beats.tolist() ) )
|
||||
#debug( "pexpireat:{}".format(pexpireat))
|
||||
debug( "bpm:{} bpm_delay:{} bpm_sample_interval:{} beats:{}".format(bpm,bpm_delay,bpm_sample_interval,beats) )
|
||||
return True
|
||||
|
||||
def m_spectrum(audio_data):
|
||||
"""
|
||||
This function saves fast analysis to redis
|
||||
"""
|
||||
|
||||
# Compute real FFT.
|
||||
fft = numpy.fft.rfft(audio_data, n=N_FFT)
|
||||
|
||||
# Compute mel spectrum.
|
||||
melspectrum = melFilter.dot(abs(fft))
|
||||
|
||||
|
||||
# Initialize output characters to display.
|
||||
spectrum_120 = [0]*BAND_TONES
|
||||
spectrum_10 = [0]*BAND_OCTAVES
|
||||
spectrum_oct = [[] for i in range(10)]
|
||||
|
||||
# Assign values
|
||||
for i in range(BAND_TONES):
|
||||
val = round(melspectrum[i],2)
|
||||
spectrum_120[i] = val
|
||||
key = int(math.floor( i / 12 ))
|
||||
spectrum_oct[key].append(val)
|
||||
|
||||
for i in range(BAND_OCTAVES):
|
||||
spectrum_10[i] = round(sum( spectrum_oct[i] ) / len( spectrum_oct[i]),2)
|
||||
|
||||
# Get RMS
|
||||
#rms = librosa.feature.rms( S=melspectrum )
|
||||
rms = librosa.feature.rms( y=audio_data ).tolist()[0]
|
||||
rms_avg = round(sum(rms) / len(rms),2)
|
||||
|
||||
# Save to redis
|
||||
#debug( 'spectrum_120:{} '.format(spectrum_120))
|
||||
debug( 'spectrum_10:{}'.format(spectrum_10))
|
||||
#debug( 'rms:{}'.format(rms_avg))
|
||||
r.set( 'spectrum_120', json.dumps( spectrum_120 ) )
|
||||
r.set( 'spectrum_10', json.dumps( spectrum_10 ) )
|
||||
r.set( 'rms', "{}".format(rms_avg) )
|
||||
return True
|
||||
|
||||
|
||||
def callback(in_data, frame_count, time_info, status):
|
||||
audio_data = numpy.frombuffer(in_data, dtype=numpy.float32)
|
||||
|
||||
global start
|
||||
start = time.time()
|
||||
if MODE == 'spectrum':
|
||||
m_spectrum(audio_data)
|
||||
elif MODE == 'bpm':
|
||||
m_bpm( audio_data)
|
||||
else:
|
||||
debug( "Unknown mode. Exiting")
|
||||
os._exit(2)
|
||||
end = time.time()
|
||||
#debug ("\rLoop took {:.2}s on {}s ".format(end - start, SAMPLING_FREQUENCY))
|
||||
return (in_data, pyaudio.paContinue)
|
||||
|
||||
|
||||
debug( "\n\nRunning! Using mode {}.\n\n".format(MODE))
|
||||
|
||||
p = pyaudio.PyAudio()
|
||||
stream = p.open(format=pyaudio.paFloat32,
|
||||
channels=CHANNELS,
|
||||
rate=RATE,
|
||||
input=True, # Do record input.
|
||||
output=False, # Do not play back output.
|
||||
frames_per_buffer=FRAMES_PER_BUFFER,
|
||||
input_device_index = DEVICE,
|
||||
stream_callback=callback)
|
||||
|
||||
stream.start_stream()
|
||||
|
||||
while stream.is_active():
|
||||
time.sleep(SAMPLING_FREQUENCY)
|
||||
|
||||
stream.stop_stream()
|
||||
stream.close()
|
||||
|
||||
p.terminate()
|
14
redilysis.py
14
redilysis.py
@ -81,7 +81,14 @@ ip = args.ip
|
||||
port = args.port
|
||||
verbose = args.verbose
|
||||
|
||||
melFilter = librosa.filters.mel(RATE, N_FFT, BAND_TONES, fmin=F_LO, fmax=F_HI)
|
||||
#melFilter = librosa.filters.mel(RATE, N_FFT, BAND_TONES, fmin=F_LO, fmax=F_HI)
|
||||
melFilter = librosa.filters.mel(
|
||||
sr=RATE,
|
||||
n_fft=N_FFT,
|
||||
n_mels=BAND_TONES,
|
||||
fmin=F_LO,
|
||||
fmax=F_HI
|
||||
)
|
||||
|
||||
r = redis.Redis(
|
||||
host=ip,
|
||||
@ -107,7 +114,6 @@ if( LIST_DEVICES ):
|
||||
os._exit(1)
|
||||
|
||||
|
||||
|
||||
def m_bpm(audio_data):
|
||||
"""
|
||||
This function saves slow analysis to redis
|
||||
@ -235,7 +241,9 @@ def m_spectrum(audio_data):
|
||||
|
||||
|
||||
def callback(in_data, frame_count, time_info, status):
|
||||
audio_data = numpy.fromstring(in_data, dtype=numpy.float32)
|
||||
|
||||
audio_data = numpy.frombuffer(in_data, dtype=numpy.float32)
|
||||
#audio_data = numpy.fromstring(in_data, dtype=numpy.float32)
|
||||
|
||||
global start
|
||||
start = time.time()
|
||||
|
445
redisplay.py
Normal file
445
redisplay.py
Normal file
@ -0,0 +1,445 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Redisplay: Live Audio Spectrum Analyzer
|
||||
|
||||
This script provides a real-time audio visualization tool featuring a spectrum
|
||||
analyzer, a waveform display, and multi-band VU meters. It captures audio from a
|
||||
selected input device, processes it, and displays the analysis in a user-friendly
|
||||
GUI built with Tkinter.
|
||||
|
||||
Key Features:
|
||||
- Live waveform and spectrum plotting using Matplotlib.
|
||||
- Configurable audio parameters (sample rate, window size).
|
||||
- Device selection from available system audio inputs.
|
||||
- VU meters for different frequency bands.
|
||||
- Optional connection to a Redis server (for potential future use).
|
||||
- Dark theme for the GUI using sv_ttk.
|
||||
|
||||
Author: Sam
|
||||
Date: 2025-07-14
|
||||
License: to be defined
|
||||
"""
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, messagebox
|
||||
import tkinter.font as tkfont
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||
from matplotlib.animation import FuncAnimation
|
||||
import queue
|
||||
import sv_ttk
|
||||
import sys
|
||||
import redis
|
||||
from scipy.fft import rfft, rfftfreq
|
||||
|
||||
class VUMeter(tk.Canvas):
|
||||
def __init__(self, parent, *args, **kwargs):
|
||||
super().__init__(parent, *args, **kwargs)
|
||||
self.config(width=40, bg='#282828', highlightthickness=0)
|
||||
self.level = 0
|
||||
self.bind("<Configure>", self._draw_meter)
|
||||
|
||||
def _draw_meter(self, event=None):
|
||||
self.delete("all")
|
||||
width = self.winfo_width()
|
||||
height = self.winfo_height()
|
||||
num_segments = 20
|
||||
segment_height = (height - 4) / num_segments
|
||||
level_segment = int(self.level * num_segments)
|
||||
|
||||
for i in range(num_segments):
|
||||
y0 = height - (i + 1) * segment_height
|
||||
y1 = height - i * segment_height - 2
|
||||
color = self._get_color(i, num_segments)
|
||||
if i < level_segment:
|
||||
self.create_rectangle(2, y0, width - 2, y1, fill=color, outline="")
|
||||
|
||||
def _get_color(self, index, total):
|
||||
if index > total * 0.85:
|
||||
return "#ff0000"
|
||||
elif index > total * 0.7:
|
||||
return "#ff4500"
|
||||
elif index > total * 0.5:
|
||||
return "#ffa500"
|
||||
else:
|
||||
return "#00dd00"
|
||||
|
||||
def set_level(self, level):
|
||||
self.level = max(0.0, min(1.0, level))
|
||||
self._draw_meter()
|
||||
|
||||
class AudioPlotterApp(tk.Tk):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.title("Live Audio Spectrum Analyzer")
|
||||
self.geometry("1000x700")
|
||||
sv_ttk.set_theme("dark")
|
||||
|
||||
self.stream = None
|
||||
self.animation = None
|
||||
self.q = queue.Queue()
|
||||
self.vu_meters = []
|
||||
self.samplerate = 44100 # Default, will be overwritten
|
||||
self.spectrum_line = None
|
||||
self.waveform_line = None
|
||||
self.xf = None
|
||||
|
||||
# Define small font style for compact controls
|
||||
self.small_font = tkfont.Font(size=9)
|
||||
style = ttk.Style(self)
|
||||
style.configure('Small.TLabel', font=self.small_font)
|
||||
style.configure('Small.TEntry', font=self.small_font)
|
||||
style.configure('Small.TButton', font=self.small_font)
|
||||
style.configure('Small.TCombobox', font=self.small_font)
|
||||
style.configure('Small.TLabelframe.Label', font=self.small_font)
|
||||
|
||||
# Apply small font to default ttk widget families in the app
|
||||
self.option_add("*TLabel.Font", self.small_font)
|
||||
self.option_add("*TEntry.Font", self.small_font)
|
||||
self.option_add("*TButton.Font", self.small_font)
|
||||
self.option_add("*TCombobox*Font", self.small_font)
|
||||
self.option_add("*Labelframe.LabelFont", self.small_font)
|
||||
|
||||
self._create_widgets()
|
||||
self._force_focus()
|
||||
|
||||
def _force_focus(self):
|
||||
if sys.platform != "darwin":
|
||||
return
|
||||
self.lift()
|
||||
self.attributes('-topmost', True)
|
||||
self.after_idle(self.attributes, '-topmost', False)
|
||||
self.focus_force()
|
||||
|
||||
def _create_widgets(self):
|
||||
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
|
||||
main_pane.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
top_pane = ttk.PanedWindow(main_pane, orient=tk.HORIZONTAL)
|
||||
main_pane.add(top_pane, weight=1)
|
||||
|
||||
controls_frame = ttk.Frame(top_pane, padding="10")
|
||||
top_pane.add(controls_frame, weight=0)
|
||||
|
||||
vu_meter_frame = ttk.Frame(top_pane, padding="5")
|
||||
top_pane.add(vu_meter_frame, weight=1)
|
||||
|
||||
self.plot_frame = ttk.Frame(main_pane)
|
||||
main_pane.add(self.plot_frame, weight=4)
|
||||
|
||||
self._create_controls(controls_frame)
|
||||
self._create_vu_meters_area(vu_meter_frame)
|
||||
self._create_plot_area()
|
||||
|
||||
def _create_controls(self, parent):
|
||||
parent.columnconfigure(0, weight=1)
|
||||
current_row = 0
|
||||
|
||||
# --- Top frame for parameters (2 columns) ---
|
||||
top_params_frame = ttk.Frame(parent)
|
||||
top_params_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
|
||||
top_params_frame.columnconfigure((0, 1), weight=1)
|
||||
current_row += 1
|
||||
|
||||
# Left column: Audio settings
|
||||
audio_params_frame = ttk.LabelFrame(top_params_frame, text="Audio Parameters")
|
||||
audio_params_frame.grid(row=0, column=0, padx=(0, 5), sticky="nsew")
|
||||
audio_params_frame.columnconfigure(1, weight=1)
|
||||
|
||||
self.settings = {}
|
||||
settings_map = {
|
||||
'window_size': ('Window Size', 1024),
|
||||
'downsample': ('Downsample', 1),
|
||||
'vu_bands': ('VU Bands', 12),
|
||||
'samplerate': ('Sample Rate', '44100'),
|
||||
}
|
||||
for i, (key, (text, default)) in enumerate(settings_map.items()):
|
||||
ttk.Label(audio_params_frame, text=text, style='Small.TLabel').grid(row=i, column=0, padx=5, pady=1, sticky="w")
|
||||
var = tk.StringVar(value=str(default))
|
||||
entry = ttk.Entry(audio_params_frame, textvariable=var, style='Small.TEntry')
|
||||
entry.grid(row=i, column=1, padx=5, pady=2, sticky="ew")
|
||||
self.settings[key] = var
|
||||
|
||||
# Right column: Redis settings (visual only)
|
||||
redis_params_frame = ttk.LabelFrame(top_params_frame, text="Redis Connection")
|
||||
redis_params_frame.grid(row=0, column=1, padx=(5, 0), sticky="nsew")
|
||||
redis_params_frame.columnconfigure(1, weight=1)
|
||||
|
||||
# Default Redis connection parameters
|
||||
self.ip_var = tk.StringVar(value="127.0.0.1")
|
||||
self.port_var = tk.StringVar(value="6379")
|
||||
|
||||
ttk.Label(redis_params_frame, text="IP Address:", style='Small.TLabel').grid(row=0, column=0, sticky="w", padx=5, pady=1)
|
||||
ttk.Entry(redis_params_frame, textvariable=self.ip_var, style='Small.TEntry').grid(row=0, column=1, sticky="ew", padx=5, pady=1)
|
||||
|
||||
ttk.Label(redis_params_frame, text="Port:", style='Small.TLabel').grid(row=1, column=0, sticky="w", padx=5, pady=1)
|
||||
ttk.Entry(redis_params_frame, textvariable=self.port_var, style='Small.TEntry').grid(row=1, column=1, sticky="ew", padx=5, pady=1)
|
||||
|
||||
ttk.Button(redis_params_frame, text="Connect", command=self.connect_redis, style='Small.TButton').grid(row=2, column=0, columnspan=2, pady=4, padx=5, sticky="ew")
|
||||
|
||||
# --- Middle frame: Device selector (full width) ---
|
||||
device_frame = ttk.LabelFrame(parent, text="Audio Device")
|
||||
device_frame.grid(row=current_row, column=0, sticky="ew", pady=5)
|
||||
device_frame.columnconfigure(0, weight=1)
|
||||
current_row += 1
|
||||
|
||||
self.device_var = tk.StringVar()
|
||||
self.devices = self._get_devices()
|
||||
self.device_menu = ttk.Combobox(device_frame, textvariable=self.device_var, values=list(self.devices.keys()), state='readonly', style='Small.TCombobox')
|
||||
self.device_menu.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
|
||||
try:
|
||||
self.device_menu.set(list(self.devices.keys())[sd.default.device['input']])
|
||||
except (ValueError, IndexError, KeyError):
|
||||
if self.devices: self.device_menu.set(list(self.devices.keys())[0])
|
||||
|
||||
# --- Bottom frame: Buttons (full width) ---
|
||||
button_frame = ttk.Frame(parent)
|
||||
button_frame.grid(row=current_row, column=0, sticky="ew", pady=(10,0))
|
||||
button_frame.columnconfigure((0, 1), weight=1)
|
||||
current_row += 1
|
||||
|
||||
self.start_button = ttk.Button(button_frame, text="Start", command=self.toggle_stream, style='Small.TButton')
|
||||
self.start_button.grid(row=0, column=0, padx=5, sticky="ew")
|
||||
|
||||
self.quit_button = ttk.Button(button_frame, text="Quit", command=self.on_closing, style='Small.TButton')
|
||||
self.quit_button.grid(row=0, column=1, padx=5, sticky="ew")
|
||||
|
||||
# Attempt initial Redis connection
|
||||
self.connect_redis()
|
||||
|
||||
|
||||
|
||||
def connect_redis(self):
|
||||
"""Establish connection to Redis using current IP/Port entries."""
|
||||
try:
|
||||
host = self.ip_var.get()
|
||||
port = int(self.port_var.get())
|
||||
self.redis_conn = redis.Redis(host=host, port=port, decode_responses=True)
|
||||
self.redis_conn.ping()
|
||||
print(f"Connected to Redis at {host}:{port}")
|
||||
except Exception as e:
|
||||
print(f"Failed to connect to Redis: {e}")
|
||||
self.redis_conn = None
|
||||
|
||||
def _create_vu_meters_area(self, parent):
|
||||
self.vu_meter_container = parent
|
||||
self.vu_meter_container.bind("<Configure>", lambda e: self.recreate_vu_meters())
|
||||
self.recreate_vu_meters()
|
||||
|
||||
def recreate_vu_meters(self):
|
||||
for widget in self.vu_meter_container.winfo_children():
|
||||
widget.destroy()
|
||||
self.vu_meters.clear()
|
||||
|
||||
try:
|
||||
num_bands = int(self.settings['vu_bands'].get())
|
||||
except (ValueError, KeyError):
|
||||
num_bands = 0
|
||||
|
||||
for i in range(num_bands):
|
||||
vu_meter = VUMeter(self.vu_meter_container)
|
||||
vu_meter.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=2)
|
||||
self.vu_meters.append(vu_meter)
|
||||
|
||||
def _create_plot_area(self):
|
||||
self.fig, (self.ax_spectrum, self.ax_waveform) = plt.subplots(
|
||||
2, 1, gridspec_kw={'height_ratios': [2, 3]}
|
||||
)
|
||||
self.fig.patch.set_facecolor('#282828')
|
||||
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
|
||||
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
|
||||
self._configure_plots()
|
||||
|
||||
def _get_devices(self):
|
||||
devices = sd.query_devices()
|
||||
return {f"{i}: {d['name']}": i for i, d in enumerate(devices) if d['max_input_channels'] > 0}
|
||||
|
||||
def _configure_plots(self):
|
||||
# Configure spectrum plot
|
||||
self.ax_spectrum.clear()
|
||||
self.ax_spectrum.set_facecolor('#282828')
|
||||
self.ax_spectrum.set_xscale('log')
|
||||
self.ax_spectrum.set_yscale('linear')
|
||||
self.ax_spectrum.set_ylabel('Magnitude (dB)', color='white')
|
||||
self.ax_spectrum.set_ylim(-60, 10)
|
||||
self.ax_spectrum.set_xlim(20, self.samplerate / 2)
|
||||
self.ax_spectrum.grid(True, color='gray', linestyle='--')
|
||||
self.ax_spectrum.tick_params(axis='x', colors='white')
|
||||
self.ax_spectrum.tick_params(axis='y', colors='white')
|
||||
for spine in self.ax_spectrum.spines.values():
|
||||
spine.set_edgecolor('white')
|
||||
|
||||
# Configure waveform plot
|
||||
self.ax_waveform.clear()
|
||||
self.ax_waveform.set_facecolor('#282828')
|
||||
self.ax_waveform.set_xlabel('Time (Samples)', color='white')
|
||||
self.ax_waveform.set_ylabel('Amplitude', color='white')
|
||||
self.ax_waveform.set_ylim(-1, 1)
|
||||
self.ax_waveform.grid(True, color='gray', linestyle='--')
|
||||
self.ax_waveform.tick_params(axis='x', colors='white')
|
||||
self.ax_waveform.tick_params(axis='y', colors='white')
|
||||
for spine in self.ax_waveform.spines.values():
|
||||
spine.set_edgecolor('white')
|
||||
|
||||
self.fig.tight_layout(pad=1.5)
|
||||
self.canvas.draw()
|
||||
|
||||
def toggle_stream(self):
|
||||
if self.stream is None:
|
||||
self.start_stream()
|
||||
else:
|
||||
self.stop_stream()
|
||||
|
||||
def start_stream(self):
|
||||
print("DEBUG: Attempting to start stream...")
|
||||
try:
|
||||
self.recreate_vu_meters()
|
||||
device_name = self.device_var.get()
|
||||
device_id = self.devices[device_name]
|
||||
device_info = sd.query_devices(device_id, 'input')
|
||||
print(f"DEBUG: Selected device: {device_name} (ID: {device_id})")
|
||||
|
||||
try:
|
||||
self.samplerate = float(self.settings['samplerate'].get())
|
||||
print(f"DEBUG: Using user-defined sample rate: {self.samplerate} Hz")
|
||||
except (ValueError, KeyError):
|
||||
self.samplerate = device_info['default_samplerate']
|
||||
self.settings['samplerate'].set(str(int(self.samplerate)))
|
||||
print(f"DEBUG: Invalid sample rate, using device default: {self.samplerate} Hz")
|
||||
|
||||
self.window_size = int(self.settings['window_size'].get())
|
||||
print(f"DEBUG: Using window size: {self.window_size}")
|
||||
|
||||
# Pre-calculate frequency bins
|
||||
self.xf = rfftfreq(self.window_size, 1 / self.samplerate)
|
||||
|
||||
# Configure plots and pre-create line objects
|
||||
self._configure_plots()
|
||||
self.ax_waveform.set_xlim(0, self.window_size)
|
||||
self.spectrum_line, = self.ax_spectrum.plot(self.xf, np.zeros_like(self.xf), color='#00dd00')
|
||||
self.waveform_line, = self.ax_waveform.plot(np.arange(self.window_size), np.zeros(self.window_size), color='#00dd00')
|
||||
|
||||
self.stream = sd.InputStream(
|
||||
device=device_id, channels=1,
|
||||
samplerate=self.samplerate, callback=self.audio_callback,
|
||||
blocksize=self.window_size, latency='low')
|
||||
|
||||
self.animation = FuncAnimation(self.fig, self.update_plot, interval=30, blit=True, cache_frame_data=False, save_count=sys.maxsize)
|
||||
self.stream.start()
|
||||
self.start_button.config(text="Stop")
|
||||
print("DEBUG: Stream started successfully.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to start stream: {e}")
|
||||
messagebox.showerror("Error starting stream", str(e))
|
||||
self.stop_stream()
|
||||
|
||||
|
||||
def stop_stream(self):
|
||||
if getattr(self, 'animation', None):
|
||||
self.animation.event_source.stop()
|
||||
self.animation = None
|
||||
print("DEBUG: Animation stopped.")
|
||||
if self.stream:
|
||||
self.stream.stop()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
print("DEBUG: Audio stream stopped and closed.")
|
||||
with self.q.mutex:
|
||||
self.q.queue.clear()
|
||||
print("DEBUG: Queue cleared.")
|
||||
self.start_button.config(text="Start")
|
||||
|
||||
|
||||
def audio_callback(self, indata, frames, time, status):
|
||||
if status:
|
||||
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
|
||||
|
||||
if status:
|
||||
print(f"ERROR in audio_callback: {status}", file=sys.stderr)
|
||||
self.q.put(indata[:, 0])
|
||||
|
||||
|
||||
def update_plot(self, frame):
|
||||
try:
|
||||
data = self.q.get_nowait()
|
||||
except queue.Empty:
|
||||
return (self.spectrum_line, self.waveform_line)
|
||||
|
||||
if not self.spectrum_line or not self.waveform_line:
|
||||
return (self.spectrum_line, self.waveform_line)
|
||||
|
||||
try:
|
||||
N = len(data)
|
||||
if N != self.window_size:
|
||||
print(f"WARN: Mismatched data size. Expected {self.window_size}, got {N}")
|
||||
return (self.plot_line,)
|
||||
|
||||
# Calculate RMS of time-domain signal
|
||||
self.rms = float(np.sqrt(np.mean(np.square(data))))
|
||||
|
||||
yf = rfft(data)
|
||||
magnitude_db = 20 * np.log10(np.abs(yf) / N + 1e-9)
|
||||
self.spectrum_line.set_ydata(magnitude_db)
|
||||
self.waveform_line.set_ydata(data)
|
||||
|
||||
# print(f"\rDEBUG: Queue: {self.q.qsize()}, Plot max: {np.max(magnitude_db):.2f} dB", end='')
|
||||
|
||||
self.update_vu_meters(np.abs(yf) / N)
|
||||
|
||||
except Exception as e:
|
||||
print(f"\nERROR in update_plot: {e}")
|
||||
|
||||
return (self.spectrum_line, self.waveform_line)
|
||||
|
||||
def update_vu_meters(self, magnitude):
|
||||
if not self.vu_meters or self.xf is None:
|
||||
return
|
||||
|
||||
num_bands = len(self.vu_meters)
|
||||
max_freq = self.samplerate / 2
|
||||
if max_freq <= 20:
|
||||
max_freq = 20000
|
||||
freq_bins = np.logspace(np.log10(20), np.log10(max_freq), num_bands + 1)
|
||||
|
||||
levels = []
|
||||
for i in range(num_bands):
|
||||
low_freq, high_freq = freq_bins[i], freq_bins[i+1]
|
||||
idx = np.where((self.xf >= low_freq) & (self.xf < high_freq))
|
||||
if len(idx[0]) > 0:
|
||||
avg_mag = np.mean(magnitude[idx])
|
||||
else:
|
||||
avg_mag = 0.0
|
||||
|
||||
# Convert linear magnitude to dB
|
||||
level_db = 20 * np.log10(avg_mag + 1e-9) # -inf..0
|
||||
# Map -60 dB → 0 and 0 dB → 10
|
||||
level10 = np.clip((level_db + 60) / 6.0, 0.0, 10.0)
|
||||
|
||||
levels.append(round(float(level10), 2))
|
||||
# Vu meter expects 0-1 range
|
||||
self.vu_meters[i].set_level(level10 / 10.0)
|
||||
|
||||
# Build text representation and print once per call
|
||||
self.tenbins = f"{levels}"
|
||||
# Debug print
|
||||
#print(f"tenbins={self.tenbins}, rms={self.rms:.3f}")
|
||||
|
||||
# Push to Redis
|
||||
if getattr(self, 'redis_conn', None):
|
||||
try:
|
||||
self.redis_conn.set('spectrum_10', self.tenbins)
|
||||
self.redis_conn.set('rms', f"{self.rms:.3f}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Redis error: {e}")
|
||||
|
||||
def on_closing(self):
|
||||
self.stop_stream()
|
||||
self.destroy()
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = AudioPlotterApp()
|
||||
app.protocol("WM_DELETE_WINDOW", app.on_closing)
|
||||
app.mainloop()
|
Loading…
Reference in New Issue
Block a user